Files
firefrost-services/services/whitelist-manager/app.py
Claude (The Golden Chronicler #50) 8d989d74af feat: Add current Whitelist Manager to monorepo (will be replaced by Arbiter 2.x)
WHAT WAS DONE:
- Copied current Whitelist Manager Python code to services/whitelist-manager/
- Extracted from Billing VPS (38.68.14.188) via Cockpit terminal
- 1 file: app.py (Flask application, ~600 lines)

WHY:
Preserve current broken Whitelist Manager code before replacement.
This code is BROKEN (see Task #86, #90) and will be retired once
Arbiter 2.x is deployed, but we need it in version control for
reference during migration.

WHAT'S BROKEN:
- Hardcoded server grouping by name keywords (not node allocation)
- WebSocket console commands for status (unreliable, returns UNKNOWN)
- No subscription integration
- No master whitelist concept
- Manual-only operation

CURRENT STATE:
- Running on Billing VPS at whitelist.firefrostgaming.com
- Broken since Panel v1.12.1 upgrade (March 13, 2026)
- All servers show 'UNKNOWN' status
- Grouping logic fails (11 servers in 'Unknown' group)
- Bulk operations not working

REPLACEMENT:
Task #90 - Arbiter 2.x will replace this with:
- Subscription-driven access control
- Master whitelist in PostgreSQL
- Pterodactyl File Management API (not WebSocket)
- Auto-discovery by node allocation
- Discord /link command integration

FILES:
- services/whitelist-manager/app.py (new, 600 lines)

NOTE:
This is REFERENCE CODE ONLY. Do not deploy or build upon.
Arbiter 2.x is the correct path forward.

Signed-off-by: The Golden Chronicler <claude@firefrostgaming.com>
2026-03-31 22:44:54 +00:00

621 lines
20 KiB
Python

from flask import Flask, render_template, request, jsonify
from flask_httpauth import HTTPBasicAuth
import os
import requests
import re
import json
import asyncio
import websockets
from datetime import datetime
app = Flask(__name__)
auth = HTTPBasicAuth()
PTERODACTYL_URL = os.getenv('PTERODACTYL_URL', 'https://panel.firefrostgaming.com')
API_KEY = os.getenv('PTERODACTYL_API_KEY')
USERNAME = os.getenv('DASHBOARD_USERNAME', 'admin')
PASSWORD = os.getenv('DASHBOARD_PASSWORD', 'changeme')
EXCLUDED_SERVERS = ['FoundryVTT', 'Hytale']
# Activity log (in-memory, last 50 actions)
activity_log = []
def log_activity(action, player=None, server=None, success=True):
"""Add an entry to the activity log"""
entry = {
'timestamp': datetime.now().isoformat(),
'action': action,
'player': player,
'server': server,
'success': success
}
activity_log.insert(0, entry)
if len(activity_log) > 50:
activity_log.pop()
def is_uuid(player_input):
"""Check if input is a UUID format (with or without dashes)"""
uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$'
return bool(re.match(uuid_pattern, player_input.lower()))
def validate_minecraft_username(username):
"""Validate Minecraft username format"""
if is_uuid(username):
return True # UUIDs are always valid
# Minecraft username rules: 3-16 chars, alphanumeric + underscore
if not re.match(r'^[a-zA-Z0-9_]{3,16}$', username):
return False
return True
async def get_whitelist_players(server_uuid):
"""Get list of whitelisted players from a server"""
headers = {
'Authorization': f'Bearer {API_KEY}',
'Accept': 'application/json'
}
try:
response = requests.get(
f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/websocket',
headers=headers,
timeout=5
)
if response.status_code != 200:
return []
ws_data = response.json()
socket_url = ws_data['data']['socket']
token = ws_data['data']['token']
async with websockets.connect(
socket_url,
ping_interval=None,
additional_headers={'Origin': PTERODACTYL_URL}
) as websocket:
auth_message = json.dumps({'event': 'auth', 'args': [token]})
await websocket.send(auth_message)
await asyncio.wait_for(websocket.recv(), timeout=5)
list_command = json.dumps({'event': 'send command', 'args': ['whitelist list']})
await websocket.send(list_command)
players = []
for i in range(10):
try:
message = await asyncio.wait_for(websocket.recv(), timeout=2)
data = json.loads(message)
if data.get('event') == 'console output':
output = data.get('args', [''])[0]
# Parse player names from output
# Format: "There are 3 whitelisted players: player1, player2, player3"
if 'whitelisted players:' in output.lower():
player_text = output.split(':', 1)[1].strip()
players = [p.strip() for p in player_text.split(',')]
break
except asyncio.TimeoutError:
break
return players
except Exception as e:
print(f"Error getting whitelist: {e}")
return []
def get_server_running_status(server_uuid):
"""Check if server is actually running via resources endpoint"""
headers = {
'Authorization': f'Bearer {API_KEY}',
'Accept': 'application/json'
}
try:
response = requests.get(
f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/resources',
headers=headers,
timeout=5
)
if response.status_code != 200:
return False
data = response.json()
current_state = data.get('attributes', {}).get('current_state', 'offline')
return current_state == 'running'
except Exception as e:
print(f"Error checking server status for {server_uuid}: {e}")
return False
async def check_whitelist_via_websocket(server_uuid):
"""Connect to server websocket and check whitelist status"""
headers = {
'Authorization': f'Bearer {API_KEY}',
'Accept': 'application/json'
}
try:
response = requests.get(
f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/websocket',
headers=headers,
timeout=5
)
if response.status_code != 200:
return 'unknown'
ws_data = response.json()
socket_url = ws_data['data']['socket']
token = ws_data['data']['token']
async with websockets.connect(
socket_url,
ping_interval=None,
additional_headers={'Origin': PTERODACTYL_URL}
) as websocket:
auth_message = json.dumps({'event': 'auth', 'args': [token]})
await websocket.send(auth_message)
await asyncio.wait_for(websocket.recv(), timeout=5)
test_command = json.dumps({'event': 'send command', 'args': ['whitelist on']})
await websocket.send(test_command)
for i in range(10):
try:
message = await asyncio.wait_for(websocket.recv(), timeout=2)
data = json.loads(message)
if data.get('event') == 'console output':
output = data.get('args', [''])[0]
output_lower = output.lower()
if 'already on' in output_lower or 'already enabled' in output_lower:
return 'whitelisted'
elif 'now turned on' in output_lower or 'whitelist is now on' in output_lower:
off_command = json.dumps({'event': 'send command', 'args': ['whitelist off']})
await websocket.send(off_command)
return 'public'
except asyncio.TimeoutError:
break
return 'unknown'
except Exception as e:
print(f"Error: {e}")
return 'unknown'
async def toggle_whitelist_via_websocket(server_uuid, enable):
"""Toggle whitelist on or off"""
headers = {
'Authorization': f'Bearer {API_KEY}',
'Accept': 'application/json'
}
try:
response = requests.get(
f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/websocket',
headers=headers,
timeout=5
)
if response.status_code != 200:
return False
ws_data = response.json()
socket_url = ws_data['data']['socket']
token = ws_data['data']['token']
async with websockets.connect(
socket_url,
ping_interval=None,
additional_headers={'Origin': PTERODACTYL_URL}
) as websocket:
auth_message = json.dumps({'event': 'auth', 'args': [token]})
await websocket.send(auth_message)
await asyncio.wait_for(websocket.recv(), timeout=5)
command = 'whitelist on' if enable else 'whitelist off'
cmd_message = json.dumps({'event': 'send command', 'args': [command]})
await websocket.send(cmd_message)
for i in range(5):
try:
message = await asyncio.wait_for(websocket.recv(), timeout=2)
data = json.loads(message)
if data.get('event') == 'console output':
output = data.get('args', [''])[0].lower()
if 'turned on' in output or 'turned off' in output:
return True
except asyncio.TimeoutError:
break
return True
except Exception as e:
print(f"Error toggling whitelist: {e}")
return False
@auth.verify_password
def verify_password(username, password):
if username == USERNAME and password == PASSWORD:
return username
return None
def get_servers():
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
try:
response = requests.get(f'{PTERODACTYL_URL}/api/client', headers=headers)
if response.status_code != 200:
return {'TX1': [], 'NC1': [], 'Unknown': []}
data = response.json()
servers = {'TX1': [], 'NC1': [], 'Unknown': []}
for server in data.get('data', []):
attributes = server.get('attributes', {})
name = attributes.get('name', 'Unknown')
short_uuid = attributes.get('identifier', '')
full_uuid = attributes.get('uuid', '')
if any(excluded in name for excluded in EXCLUDED_SERVERS):
continue
is_running = get_server_running_status(short_uuid)
server_info = {
'name': name,
'uuid': short_uuid,
'full_uuid': full_uuid,
'running': is_running
}
if any(keyword in name.lower() for keyword in ['reclamation', 'stoneblock', 'society', 'vanilla', 'mons']):
servers['TX1'].append(server_info)
elif any(keyword in name.lower() for keyword in ['ember', 'minecolonies', 'mods', 'homestead', 'subterra']):
servers['NC1'].append(server_info)
else:
servers['Unknown'].append(server_info)
return servers
except Exception as e:
print(f"Error fetching servers: {e}")
return {'TX1': [], 'NC1': [], 'Unknown': []}
@app.route('/')
@auth.login_required
def index():
servers = get_servers()
return render_template('index.html', servers=servers)
@app.route('/health')
def health():
return jsonify({'service': 'whitelist-manager', 'status': 'healthy'})
@app.route('/api/servers')
@auth.login_required
def api_servers():
return jsonify(get_servers())
@app.route('/api/activity')
@auth.login_required
def get_activity():
"""Get recent activity log"""
return jsonify({'activity': activity_log[:10]})
@app.route('/api/check-whitelist/<server_uuid>', methods=['GET'])
@auth.login_required
def check_whitelist_status(server_uuid):
"""Check whitelist status"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
status = loop.run_until_complete(check_whitelist_via_websocket(server_uuid))
loop.close()
return jsonify({'success': True, 'status': status})
@app.route('/api/get-whitelist/<server_uuid>', methods=['GET'])
@auth.login_required
def get_whitelist(server_uuid):
"""Get list of whitelisted players"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
players = loop.run_until_complete(get_whitelist_players(server_uuid))
loop.close()
return jsonify({'success': True, 'players': players})
@app.route('/api/toggle-whitelist/<server_uuid>', methods=['POST'])
@auth.login_required
def toggle_whitelist(server_uuid):
"""Toggle whitelist on/off"""
data = request.json
enable = data.get('enable', True)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
success = loop.run_until_complete(toggle_whitelist_via_websocket(server_uuid, enable))
loop.close()
new_status = 'whitelisted' if enable else 'public'
action = 'Enable whitelist' if enable else 'Disable whitelist'
# Get server name
servers = get_servers()
server_name = None
for node_servers in servers.values():
for server in node_servers:
if server['uuid'] == server_uuid:
server_name = server['name']
break
log_activity(action, server=server_name, success=success)
return jsonify({'success': success, 'status': new_status})
@app.route('/api/toggle-all-whitelist', methods=['POST'])
@auth.login_required
def toggle_all_whitelist():
"""Toggle whitelist on/off for ALL servers"""
data = request.json
enable = data.get('enable', True)
servers = get_servers()
all_servers = []
for node_servers in servers.values():
all_servers.extend([s for s in node_servers if s['running']])
successes = 0
failures = 0
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for server in all_servers:
success = loop.run_until_complete(toggle_whitelist_via_websocket(server['uuid'], enable))
if success:
successes += 1
else:
failures += 1
loop.close()
action = f"{'Enable' if enable else 'Disable'} whitelist on ALL servers"
log_activity(action, success=(failures == 0))
return jsonify({
'success': True,
'message': f"{'Enabled' if enable else 'Disabled'} whitelist on {successes} servers ({failures} failed)"
})
@app.route('/api/validate-player', methods=['POST'])
@auth.login_required
def validate_player():
"""Validate Minecraft username"""
data = request.json
player = data.get('player', '')
if not validate_minecraft_username(player):
return jsonify({'valid': False, 'message': 'Invalid username format (3-16 chars, alphanumeric + underscore)'})
return jsonify({'valid': True})
@app.route('/api/whitelist/add', methods=['POST'])
@auth.login_required
def add_to_whitelist():
data = request.json
player = data.get('player')
server_uuid = data.get('server_uuid')
if not player or not server_uuid:
return jsonify({'success': False, 'message': 'Missing player or server'}), 400
if not validate_minecraft_username(player):
return jsonify({'success': False, 'message': 'Invalid username format'}), 400
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
if is_uuid(player):
command = f'whitelist add {player}'
display_name = f'UUID {player[:8]}...'
else:
command = f'whitelist add {player}'
display_name = player
try:
response = requests.post(
f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/command',
headers=headers,
json={'command': command}
)
success = response.status_code == 204
# Get server name
servers = get_servers()
server_name = None
for node_servers in servers.values():
for server in node_servers:
if server['uuid'] == server_uuid:
server_name = server['name']
break
log_activity('Add player', player=display_name, server=server_name, success=success)
if success:
return jsonify({'success': True, 'message': f'Added {display_name} to whitelist'})
else:
return jsonify({'success': False, 'message': f'API Error: {response.status_code}'}), 500
except Exception as e:
log_activity('Add player', player=display_name, server=server_name, success=False)
return jsonify({'success': False, 'message': f'Error: {str(e)}'}), 500
@app.route('/api/whitelist/remove', methods=['POST'])
@auth.login_required
def remove_from_whitelist():
data = request.json
player = data.get('player')
server_uuid = data.get('server_uuid')
if not player or not server_uuid:
return jsonify({'success': False, 'message': 'Missing player or server'}), 400
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
if is_uuid(player):
command = f'whitelist remove {player}'
display_name = f'UUID {player[:8]}...'
else:
command = f'whitelist remove {player}'
display_name = player
try:
response = requests.post(
f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/command',
headers=headers,
json={'command': command}
)
success = response.status_code == 204
# Get server name
servers = get_servers()
server_name = None
for node_servers in servers.values():
for server in node_servers:
if server['uuid'] == server_uuid:
server_name = server['name']
break
log_activity('Remove player', player=display_name, server=server_name, success=success)
if success:
return jsonify({'success': True, 'message': f'Removed {display_name} from whitelist'})
else:
return jsonify({'success': False, 'message': f'API Error: {response.status_code}'}), 500
except Exception as e:
log_activity('Remove player', player=display_name, server=server_name, success=False)
return jsonify({'success': False, 'message': f'Error: {str(e)}'}), 500
@app.route('/api/whitelist/add-all', methods=['POST'])
@auth.login_required
def add_to_all():
data = request.json
player = data.get('player')
if not player:
return jsonify({'success': False, 'message': 'Missing player name'}), 400
if not validate_minecraft_username(player):
return jsonify({'success': False, 'message': 'Invalid username format'}), 400
servers = get_servers()
all_servers = []
for node_servers in servers.values():
all_servers.extend(node_servers)
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
command = f'whitelist add {player}'
successes = 0
failures = 0
if is_uuid(player):
display_name = f'UUID {player[:8]}...'
else:
display_name = player
for server in all_servers:
try:
response = requests.post(
f'{PTERODACTYL_URL}/api/client/servers/{server["uuid"]}/command',
headers=headers,
json={'command': command}
)
if response.status_code == 204:
successes += 1
else:
failures += 1
except Exception as e:
failures += 1
log_activity('Bulk add player', player=display_name, server='ALL', success=(failures == 0))
return jsonify({
'success': True,
'message': f'Added {display_name} to {successes} servers ({failures} failed)'
})
@app.route('/api/whitelist/remove-all', methods=['POST'])
@auth.login_required
def remove_from_all():
data = request.json
player = data.get('player')
if not player:
return jsonify({'success': False, 'message': 'Missing player name'}), 400
servers = get_servers()
all_servers = []
for node_servers in servers.values():
all_servers.extend(node_servers)
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
command = f'whitelist remove {player}'
successes = 0
failures = 0
if is_uuid(player):
display_name = f'UUID {player[:8]}...'
else:
display_name = player
for server in all_servers:
try:
response = requests.post(
f'{PTERODACTYL_URL}/api/client/servers/{server["uuid"]}/command',
headers=headers,
json={'command': command}
)
if response.status_code == 204:
successes += 1
else:
failures += 1
except Exception as e:
failures += 1
log_activity('Bulk remove player', player=display_name, server='ALL', success=(failures == 0))
return jsonify({
'success': True,
'message': f'Removed {display_name} from {successes} servers ({failures} failed)'
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001)