WHAT WAS DONE: - Copied current Whitelist Manager Python code to services/whitelist-manager/ - Extracted from Billing VPS (38.68.14.188) via Cockpit terminal - 1 file: app.py (Flask application, ~600 lines) WHY: Preserve current broken Whitelist Manager code before replacement. This code is BROKEN (see Task #86, #90) and will be retired once Arbiter 2.x is deployed, but we need it in version control for reference during migration. WHAT'S BROKEN: - Hardcoded server grouping by name keywords (not node allocation) - WebSocket console commands for status (unreliable, returns UNKNOWN) - No subscription integration - No master whitelist concept - Manual-only operation CURRENT STATE: - Running on Billing VPS at whitelist.firefrostgaming.com - Broken since Panel v1.12.1 upgrade (March 13, 2026) - All servers show 'UNKNOWN' status - Grouping logic fails (11 servers in 'Unknown' group) - Bulk operations not working REPLACEMENT: Task #90 - Arbiter 2.x will replace this with: - Subscription-driven access control - Master whitelist in PostgreSQL - Pterodactyl File Management API (not WebSocket) - Auto-discovery by node allocation - Discord /link command integration FILES: - services/whitelist-manager/app.py (new, 600 lines) NOTE: This is REFERENCE CODE ONLY. Do not deploy or build upon. Arbiter 2.x is the correct path forward. Signed-off-by: The Golden Chronicler <claude@firefrostgaming.com>
621 lines
20 KiB
Python
621 lines
20 KiB
Python
from flask import Flask, render_template, request, jsonify
|
|
from flask_httpauth import HTTPBasicAuth
|
|
import os
|
|
import requests
|
|
import re
|
|
import json
|
|
import asyncio
|
|
import websockets
|
|
from datetime import datetime
|
|
|
|
app = Flask(__name__)
|
|
auth = HTTPBasicAuth()
|
|
|
|
PTERODACTYL_URL = os.getenv('PTERODACTYL_URL', 'https://panel.firefrostgaming.com')
|
|
API_KEY = os.getenv('PTERODACTYL_API_KEY')
|
|
USERNAME = os.getenv('DASHBOARD_USERNAME', 'admin')
|
|
PASSWORD = os.getenv('DASHBOARD_PASSWORD', 'changeme')
|
|
|
|
EXCLUDED_SERVERS = ['FoundryVTT', 'Hytale']
|
|
|
|
# Activity log (in-memory, last 50 actions)
|
|
activity_log = []
|
|
|
|
def log_activity(action, player=None, server=None, success=True):
|
|
"""Add an entry to the activity log"""
|
|
entry = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'action': action,
|
|
'player': player,
|
|
'server': server,
|
|
'success': success
|
|
}
|
|
activity_log.insert(0, entry)
|
|
if len(activity_log) > 50:
|
|
activity_log.pop()
|
|
|
|
def is_uuid(player_input):
|
|
"""Check if input is a UUID format (with or without dashes)"""
|
|
uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$'
|
|
return bool(re.match(uuid_pattern, player_input.lower()))
|
|
|
|
def validate_minecraft_username(username):
|
|
"""Validate Minecraft username format"""
|
|
if is_uuid(username):
|
|
return True # UUIDs are always valid
|
|
|
|
# Minecraft username rules: 3-16 chars, alphanumeric + underscore
|
|
if not re.match(r'^[a-zA-Z0-9_]{3,16}$', username):
|
|
return False
|
|
return True
|
|
|
|
async def get_whitelist_players(server_uuid):
|
|
"""Get list of whitelisted players from a server"""
|
|
headers = {
|
|
'Authorization': f'Bearer {API_KEY}',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
try:
|
|
response = requests.get(
|
|
f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/websocket',
|
|
headers=headers,
|
|
timeout=5
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
return []
|
|
|
|
ws_data = response.json()
|
|
socket_url = ws_data['data']['socket']
|
|
token = ws_data['data']['token']
|
|
|
|
async with websockets.connect(
|
|
socket_url,
|
|
ping_interval=None,
|
|
additional_headers={'Origin': PTERODACTYL_URL}
|
|
) as websocket:
|
|
auth_message = json.dumps({'event': 'auth', 'args': [token]})
|
|
await websocket.send(auth_message)
|
|
await asyncio.wait_for(websocket.recv(), timeout=5)
|
|
|
|
list_command = json.dumps({'event': 'send command', 'args': ['whitelist list']})
|
|
await websocket.send(list_command)
|
|
|
|
players = []
|
|
for i in range(10):
|
|
try:
|
|
message = await asyncio.wait_for(websocket.recv(), timeout=2)
|
|
data = json.loads(message)
|
|
|
|
if data.get('event') == 'console output':
|
|
output = data.get('args', [''])[0]
|
|
# Parse player names from output
|
|
# Format: "There are 3 whitelisted players: player1, player2, player3"
|
|
if 'whitelisted players:' in output.lower():
|
|
player_text = output.split(':', 1)[1].strip()
|
|
players = [p.strip() for p in player_text.split(',')]
|
|
break
|
|
except asyncio.TimeoutError:
|
|
break
|
|
|
|
return players
|
|
|
|
except Exception as e:
|
|
print(f"Error getting whitelist: {e}")
|
|
return []
|
|
|
|
def get_server_running_status(server_uuid):
|
|
"""Check if server is actually running via resources endpoint"""
|
|
headers = {
|
|
'Authorization': f'Bearer {API_KEY}',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
try:
|
|
response = requests.get(
|
|
f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/resources',
|
|
headers=headers,
|
|
timeout=5
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
return False
|
|
|
|
data = response.json()
|
|
current_state = data.get('attributes', {}).get('current_state', 'offline')
|
|
return current_state == 'running'
|
|
|
|
except Exception as e:
|
|
print(f"Error checking server status for {server_uuid}: {e}")
|
|
return False
|
|
|
|
async def check_whitelist_via_websocket(server_uuid):
|
|
"""Connect to server websocket and check whitelist status"""
|
|
headers = {
|
|
'Authorization': f'Bearer {API_KEY}',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
try:
|
|
response = requests.get(
|
|
f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/websocket',
|
|
headers=headers,
|
|
timeout=5
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
return 'unknown'
|
|
|
|
ws_data = response.json()
|
|
socket_url = ws_data['data']['socket']
|
|
token = ws_data['data']['token']
|
|
|
|
async with websockets.connect(
|
|
socket_url,
|
|
ping_interval=None,
|
|
additional_headers={'Origin': PTERODACTYL_URL}
|
|
) as websocket:
|
|
auth_message = json.dumps({'event': 'auth', 'args': [token]})
|
|
await websocket.send(auth_message)
|
|
await asyncio.wait_for(websocket.recv(), timeout=5)
|
|
|
|
test_command = json.dumps({'event': 'send command', 'args': ['whitelist on']})
|
|
await websocket.send(test_command)
|
|
|
|
for i in range(10):
|
|
try:
|
|
message = await asyncio.wait_for(websocket.recv(), timeout=2)
|
|
data = json.loads(message)
|
|
|
|
if data.get('event') == 'console output':
|
|
output = data.get('args', [''])[0]
|
|
output_lower = output.lower()
|
|
|
|
if 'already on' in output_lower or 'already enabled' in output_lower:
|
|
return 'whitelisted'
|
|
elif 'now turned on' in output_lower or 'whitelist is now on' in output_lower:
|
|
off_command = json.dumps({'event': 'send command', 'args': ['whitelist off']})
|
|
await websocket.send(off_command)
|
|
return 'public'
|
|
|
|
except asyncio.TimeoutError:
|
|
break
|
|
|
|
return 'unknown'
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return 'unknown'
|
|
|
|
async def toggle_whitelist_via_websocket(server_uuid, enable):
|
|
"""Toggle whitelist on or off"""
|
|
headers = {
|
|
'Authorization': f'Bearer {API_KEY}',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
try:
|
|
response = requests.get(
|
|
f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/websocket',
|
|
headers=headers,
|
|
timeout=5
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
return False
|
|
|
|
ws_data = response.json()
|
|
socket_url = ws_data['data']['socket']
|
|
token = ws_data['data']['token']
|
|
|
|
async with websockets.connect(
|
|
socket_url,
|
|
ping_interval=None,
|
|
additional_headers={'Origin': PTERODACTYL_URL}
|
|
) as websocket:
|
|
auth_message = json.dumps({'event': 'auth', 'args': [token]})
|
|
await websocket.send(auth_message)
|
|
await asyncio.wait_for(websocket.recv(), timeout=5)
|
|
|
|
command = 'whitelist on' if enable else 'whitelist off'
|
|
cmd_message = json.dumps({'event': 'send command', 'args': [command]})
|
|
await websocket.send(cmd_message)
|
|
|
|
for i in range(5):
|
|
try:
|
|
message = await asyncio.wait_for(websocket.recv(), timeout=2)
|
|
data = json.loads(message)
|
|
if data.get('event') == 'console output':
|
|
output = data.get('args', [''])[0].lower()
|
|
if 'turned on' in output or 'turned off' in output:
|
|
return True
|
|
except asyncio.TimeoutError:
|
|
break
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error toggling whitelist: {e}")
|
|
return False
|
|
|
|
@auth.verify_password
|
|
def verify_password(username, password):
|
|
if username == USERNAME and password == PASSWORD:
|
|
return username
|
|
return None
|
|
|
|
def get_servers():
|
|
headers = {
|
|
'Authorization': f'Bearer {API_KEY}',
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
try:
|
|
response = requests.get(f'{PTERODACTYL_URL}/api/client', headers=headers)
|
|
if response.status_code != 200:
|
|
return {'TX1': [], 'NC1': [], 'Unknown': []}
|
|
|
|
data = response.json()
|
|
servers = {'TX1': [], 'NC1': [], 'Unknown': []}
|
|
|
|
for server in data.get('data', []):
|
|
attributes = server.get('attributes', {})
|
|
name = attributes.get('name', 'Unknown')
|
|
short_uuid = attributes.get('identifier', '')
|
|
full_uuid = attributes.get('uuid', '')
|
|
|
|
if any(excluded in name for excluded in EXCLUDED_SERVERS):
|
|
continue
|
|
|
|
is_running = get_server_running_status(short_uuid)
|
|
|
|
server_info = {
|
|
'name': name,
|
|
'uuid': short_uuid,
|
|
'full_uuid': full_uuid,
|
|
'running': is_running
|
|
}
|
|
|
|
if any(keyword in name.lower() for keyword in ['reclamation', 'stoneblock', 'society', 'vanilla', 'mons']):
|
|
servers['TX1'].append(server_info)
|
|
elif any(keyword in name.lower() for keyword in ['ember', 'minecolonies', 'mods', 'homestead', 'subterra']):
|
|
servers['NC1'].append(server_info)
|
|
else:
|
|
servers['Unknown'].append(server_info)
|
|
|
|
return servers
|
|
except Exception as e:
|
|
print(f"Error fetching servers: {e}")
|
|
return {'TX1': [], 'NC1': [], 'Unknown': []}
|
|
|
|
@app.route('/')
|
|
@auth.login_required
|
|
def index():
|
|
servers = get_servers()
|
|
return render_template('index.html', servers=servers)
|
|
|
|
@app.route('/health')
|
|
def health():
|
|
return jsonify({'service': 'whitelist-manager', 'status': 'healthy'})
|
|
|
|
@app.route('/api/servers')
|
|
@auth.login_required
|
|
def api_servers():
|
|
return jsonify(get_servers())
|
|
|
|
@app.route('/api/activity')
|
|
@auth.login_required
|
|
def get_activity():
|
|
"""Get recent activity log"""
|
|
return jsonify({'activity': activity_log[:10]})
|
|
|
|
@app.route('/api/check-whitelist/<server_uuid>', methods=['GET'])
|
|
@auth.login_required
|
|
def check_whitelist_status(server_uuid):
|
|
"""Check whitelist status"""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
status = loop.run_until_complete(check_whitelist_via_websocket(server_uuid))
|
|
loop.close()
|
|
|
|
return jsonify({'success': True, 'status': status})
|
|
|
|
@app.route('/api/get-whitelist/<server_uuid>', methods=['GET'])
|
|
@auth.login_required
|
|
def get_whitelist(server_uuid):
|
|
"""Get list of whitelisted players"""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
players = loop.run_until_complete(get_whitelist_players(server_uuid))
|
|
loop.close()
|
|
|
|
return jsonify({'success': True, 'players': players})
|
|
|
|
@app.route('/api/toggle-whitelist/<server_uuid>', methods=['POST'])
|
|
@auth.login_required
|
|
def toggle_whitelist(server_uuid):
|
|
"""Toggle whitelist on/off"""
|
|
data = request.json
|
|
enable = data.get('enable', True)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
success = loop.run_until_complete(toggle_whitelist_via_websocket(server_uuid, enable))
|
|
loop.close()
|
|
|
|
new_status = 'whitelisted' if enable else 'public'
|
|
action = 'Enable whitelist' if enable else 'Disable whitelist'
|
|
|
|
# Get server name
|
|
servers = get_servers()
|
|
server_name = None
|
|
for node_servers in servers.values():
|
|
for server in node_servers:
|
|
if server['uuid'] == server_uuid:
|
|
server_name = server['name']
|
|
break
|
|
|
|
log_activity(action, server=server_name, success=success)
|
|
|
|
return jsonify({'success': success, 'status': new_status})
|
|
|
|
@app.route('/api/toggle-all-whitelist', methods=['POST'])
|
|
@auth.login_required
|
|
def toggle_all_whitelist():
|
|
"""Toggle whitelist on/off for ALL servers"""
|
|
data = request.json
|
|
enable = data.get('enable', True)
|
|
|
|
servers = get_servers()
|
|
all_servers = []
|
|
for node_servers in servers.values():
|
|
all_servers.extend([s for s in node_servers if s['running']])
|
|
|
|
successes = 0
|
|
failures = 0
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
for server in all_servers:
|
|
success = loop.run_until_complete(toggle_whitelist_via_websocket(server['uuid'], enable))
|
|
if success:
|
|
successes += 1
|
|
else:
|
|
failures += 1
|
|
|
|
loop.close()
|
|
|
|
action = f"{'Enable' if enable else 'Disable'} whitelist on ALL servers"
|
|
log_activity(action, success=(failures == 0))
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f"{'Enabled' if enable else 'Disabled'} whitelist on {successes} servers ({failures} failed)"
|
|
})
|
|
|
|
@app.route('/api/validate-player', methods=['POST'])
|
|
@auth.login_required
|
|
def validate_player():
|
|
"""Validate Minecraft username"""
|
|
data = request.json
|
|
player = data.get('player', '')
|
|
|
|
if not validate_minecraft_username(player):
|
|
return jsonify({'valid': False, 'message': 'Invalid username format (3-16 chars, alphanumeric + underscore)'})
|
|
|
|
return jsonify({'valid': True})
|
|
|
|
@app.route('/api/whitelist/add', methods=['POST'])
|
|
@auth.login_required
|
|
def add_to_whitelist():
|
|
data = request.json
|
|
player = data.get('player')
|
|
server_uuid = data.get('server_uuid')
|
|
|
|
if not player or not server_uuid:
|
|
return jsonify({'success': False, 'message': 'Missing player or server'}), 400
|
|
|
|
if not validate_minecraft_username(player):
|
|
return jsonify({'success': False, 'message': 'Invalid username format'}), 400
|
|
|
|
headers = {
|
|
'Authorization': f'Bearer {API_KEY}',
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
if is_uuid(player):
|
|
command = f'whitelist add {player}'
|
|
display_name = f'UUID {player[:8]}...'
|
|
else:
|
|
command = f'whitelist add {player}'
|
|
display_name = player
|
|
|
|
try:
|
|
response = requests.post(
|
|
f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/command',
|
|
headers=headers,
|
|
json={'command': command}
|
|
)
|
|
|
|
success = response.status_code == 204
|
|
|
|
# Get server name
|
|
servers = get_servers()
|
|
server_name = None
|
|
for node_servers in servers.values():
|
|
for server in node_servers:
|
|
if server['uuid'] == server_uuid:
|
|
server_name = server['name']
|
|
break
|
|
|
|
log_activity('Add player', player=display_name, server=server_name, success=success)
|
|
|
|
if success:
|
|
return jsonify({'success': True, 'message': f'Added {display_name} to whitelist'})
|
|
else:
|
|
return jsonify({'success': False, 'message': f'API Error: {response.status_code}'}), 500
|
|
except Exception as e:
|
|
log_activity('Add player', player=display_name, server=server_name, success=False)
|
|
return jsonify({'success': False, 'message': f'Error: {str(e)}'}), 500
|
|
|
|
@app.route('/api/whitelist/remove', methods=['POST'])
|
|
@auth.login_required
|
|
def remove_from_whitelist():
|
|
data = request.json
|
|
player = data.get('player')
|
|
server_uuid = data.get('server_uuid')
|
|
|
|
if not player or not server_uuid:
|
|
return jsonify({'success': False, 'message': 'Missing player or server'}), 400
|
|
|
|
headers = {
|
|
'Authorization': f'Bearer {API_KEY}',
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
if is_uuid(player):
|
|
command = f'whitelist remove {player}'
|
|
display_name = f'UUID {player[:8]}...'
|
|
else:
|
|
command = f'whitelist remove {player}'
|
|
display_name = player
|
|
|
|
try:
|
|
response = requests.post(
|
|
f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/command',
|
|
headers=headers,
|
|
json={'command': command}
|
|
)
|
|
|
|
success = response.status_code == 204
|
|
|
|
# Get server name
|
|
servers = get_servers()
|
|
server_name = None
|
|
for node_servers in servers.values():
|
|
for server in node_servers:
|
|
if server['uuid'] == server_uuid:
|
|
server_name = server['name']
|
|
break
|
|
|
|
log_activity('Remove player', player=display_name, server=server_name, success=success)
|
|
|
|
if success:
|
|
return jsonify({'success': True, 'message': f'Removed {display_name} from whitelist'})
|
|
else:
|
|
return jsonify({'success': False, 'message': f'API Error: {response.status_code}'}), 500
|
|
except Exception as e:
|
|
log_activity('Remove player', player=display_name, server=server_name, success=False)
|
|
return jsonify({'success': False, 'message': f'Error: {str(e)}'}), 500
|
|
|
|
@app.route('/api/whitelist/add-all', methods=['POST'])
|
|
@auth.login_required
|
|
def add_to_all():
|
|
data = request.json
|
|
player = data.get('player')
|
|
|
|
if not player:
|
|
return jsonify({'success': False, 'message': 'Missing player name'}), 400
|
|
|
|
if not validate_minecraft_username(player):
|
|
return jsonify({'success': False, 'message': 'Invalid username format'}), 400
|
|
|
|
servers = get_servers()
|
|
all_servers = []
|
|
for node_servers in servers.values():
|
|
all_servers.extend(node_servers)
|
|
|
|
headers = {
|
|
'Authorization': f'Bearer {API_KEY}',
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
command = f'whitelist add {player}'
|
|
successes = 0
|
|
failures = 0
|
|
|
|
if is_uuid(player):
|
|
display_name = f'UUID {player[:8]}...'
|
|
else:
|
|
display_name = player
|
|
|
|
for server in all_servers:
|
|
try:
|
|
response = requests.post(
|
|
f'{PTERODACTYL_URL}/api/client/servers/{server["uuid"]}/command',
|
|
headers=headers,
|
|
json={'command': command}
|
|
)
|
|
if response.status_code == 204:
|
|
successes += 1
|
|
else:
|
|
failures += 1
|
|
except Exception as e:
|
|
failures += 1
|
|
|
|
log_activity('Bulk add player', player=display_name, server='ALL', success=(failures == 0))
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Added {display_name} to {successes} servers ({failures} failed)'
|
|
})
|
|
|
|
@app.route('/api/whitelist/remove-all', methods=['POST'])
|
|
@auth.login_required
|
|
def remove_from_all():
|
|
data = request.json
|
|
player = data.get('player')
|
|
|
|
if not player:
|
|
return jsonify({'success': False, 'message': 'Missing player name'}), 400
|
|
|
|
servers = get_servers()
|
|
all_servers = []
|
|
for node_servers in servers.values():
|
|
all_servers.extend(node_servers)
|
|
|
|
headers = {
|
|
'Authorization': f'Bearer {API_KEY}',
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
command = f'whitelist remove {player}'
|
|
successes = 0
|
|
failures = 0
|
|
|
|
if is_uuid(player):
|
|
display_name = f'UUID {player[:8]}...'
|
|
else:
|
|
display_name = player
|
|
|
|
for server in all_servers:
|
|
try:
|
|
response = requests.post(
|
|
f'{PTERODACTYL_URL}/api/client/servers/{server["uuid"]}/command',
|
|
headers=headers,
|
|
json={'command': command}
|
|
)
|
|
if response.status_code == 204:
|
|
successes += 1
|
|
else:
|
|
failures += 1
|
|
except Exception as e:
|
|
failures += 1
|
|
|
|
log_activity('Bulk remove player', player=display_name, server='ALL', success=(failures == 0))
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Removed {display_name} from {successes} servers ({failures} failed)'
|
|
})
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=5001)
|