from flask import Flask, render_template, request, jsonify from flask_httpauth import HTTPBasicAuth import os import requests import re import json import asyncio import websockets from datetime import datetime app = Flask(__name__) auth = HTTPBasicAuth() PTERODACTYL_URL = os.getenv('PTERODACTYL_URL', 'https://panel.firefrostgaming.com') API_KEY = os.getenv('PTERODACTYL_API_KEY') USERNAME = os.getenv('DASHBOARD_USERNAME', 'admin') PASSWORD = os.getenv('DASHBOARD_PASSWORD', 'changeme') EXCLUDED_SERVERS = ['FoundryVTT', 'Hytale'] # Activity log (in-memory, last 50 actions) activity_log = [] def log_activity(action, player=None, server=None, success=True): """Add an entry to the activity log""" entry = { 'timestamp': datetime.now().isoformat(), 'action': action, 'player': player, 'server': server, 'success': success } activity_log.insert(0, entry) if len(activity_log) > 50: activity_log.pop() def is_uuid(player_input): """Check if input is a UUID format (with or without dashes)""" uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$' return bool(re.match(uuid_pattern, player_input.lower())) def validate_minecraft_username(username): """Validate Minecraft username format""" if is_uuid(username): return True # UUIDs are always valid # Minecraft username rules: 3-16 chars, alphanumeric + underscore if not re.match(r'^[a-zA-Z0-9_]{3,16}$', username): return False return True async def get_whitelist_players(server_uuid): """Get list of whitelisted players from a server""" headers = { 'Authorization': f'Bearer {API_KEY}', 'Accept': 'application/json' } try: response = requests.get( f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/websocket', headers=headers, timeout=5 ) if response.status_code != 200: return [] ws_data = response.json() socket_url = ws_data['data']['socket'] token = ws_data['data']['token'] async with websockets.connect( socket_url, ping_interval=None, additional_headers={'Origin': PTERODACTYL_URL} ) as websocket: auth_message = json.dumps({'event': 'auth', 'args': [token]}) await websocket.send(auth_message) await asyncio.wait_for(websocket.recv(), timeout=5) list_command = json.dumps({'event': 'send command', 'args': ['whitelist list']}) await websocket.send(list_command) players = [] for i in range(10): try: message = await asyncio.wait_for(websocket.recv(), timeout=2) data = json.loads(message) if data.get('event') == 'console output': output = data.get('args', [''])[0] # Parse player names from output # Format: "There are 3 whitelisted players: player1, player2, player3" if 'whitelisted players:' in output.lower(): player_text = output.split(':', 1)[1].strip() players = [p.strip() for p in player_text.split(',')] break except asyncio.TimeoutError: break return players except Exception as e: print(f"Error getting whitelist: {e}") return [] def get_server_running_status(server_uuid): """Check if server is actually running via resources endpoint""" headers = { 'Authorization': f'Bearer {API_KEY}', 'Accept': 'application/json' } try: response = requests.get( f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/resources', headers=headers, timeout=5 ) if response.status_code != 200: return False data = response.json() current_state = data.get('attributes', {}).get('current_state', 'offline') return current_state == 'running' except Exception as e: print(f"Error checking server status for {server_uuid}: {e}") return False async def check_whitelist_via_websocket(server_uuid): """Connect to server websocket and check whitelist status""" headers = { 'Authorization': f'Bearer {API_KEY}', 'Accept': 'application/json' } try: response = requests.get( f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/websocket', headers=headers, timeout=5 ) if response.status_code != 200: return 'unknown' ws_data = response.json() socket_url = ws_data['data']['socket'] token = ws_data['data']['token'] async with websockets.connect( socket_url, ping_interval=None, additional_headers={'Origin': PTERODACTYL_URL} ) as websocket: auth_message = json.dumps({'event': 'auth', 'args': [token]}) await websocket.send(auth_message) await asyncio.wait_for(websocket.recv(), timeout=5) test_command = json.dumps({'event': 'send command', 'args': ['whitelist on']}) await websocket.send(test_command) for i in range(10): try: message = await asyncio.wait_for(websocket.recv(), timeout=2) data = json.loads(message) if data.get('event') == 'console output': output = data.get('args', [''])[0] output_lower = output.lower() if 'already on' in output_lower or 'already enabled' in output_lower: return 'whitelisted' elif 'now turned on' in output_lower or 'whitelist is now on' in output_lower: off_command = json.dumps({'event': 'send command', 'args': ['whitelist off']}) await websocket.send(off_command) return 'public' except asyncio.TimeoutError: break return 'unknown' except Exception as e: print(f"Error: {e}") return 'unknown' async def toggle_whitelist_via_websocket(server_uuid, enable): """Toggle whitelist on or off""" headers = { 'Authorization': f'Bearer {API_KEY}', 'Accept': 'application/json' } try: response = requests.get( f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/websocket', headers=headers, timeout=5 ) if response.status_code != 200: return False ws_data = response.json() socket_url = ws_data['data']['socket'] token = ws_data['data']['token'] async with websockets.connect( socket_url, ping_interval=None, additional_headers={'Origin': PTERODACTYL_URL} ) as websocket: auth_message = json.dumps({'event': 'auth', 'args': [token]}) await websocket.send(auth_message) await asyncio.wait_for(websocket.recv(), timeout=5) command = 'whitelist on' if enable else 'whitelist off' cmd_message = json.dumps({'event': 'send command', 'args': [command]}) await websocket.send(cmd_message) for i in range(5): try: message = await asyncio.wait_for(websocket.recv(), timeout=2) data = json.loads(message) if data.get('event') == 'console output': output = data.get('args', [''])[0].lower() if 'turned on' in output or 'turned off' in output: return True except asyncio.TimeoutError: break return True except Exception as e: print(f"Error toggling whitelist: {e}") return False @auth.verify_password def verify_password(username, password): if username == USERNAME and password == PASSWORD: return username return None def get_servers(): headers = { 'Authorization': f'Bearer {API_KEY}', 'Content-Type': 'application/json', 'Accept': 'application/json' } try: response = requests.get(f'{PTERODACTYL_URL}/api/client', headers=headers) if response.status_code != 200: return {'TX1': [], 'NC1': [], 'Unknown': []} data = response.json() servers = {'TX1': [], 'NC1': [], 'Unknown': []} for server in data.get('data', []): attributes = server.get('attributes', {}) name = attributes.get('name', 'Unknown') short_uuid = attributes.get('identifier', '') full_uuid = attributes.get('uuid', '') if any(excluded in name for excluded in EXCLUDED_SERVERS): continue is_running = get_server_running_status(short_uuid) server_info = { 'name': name, 'uuid': short_uuid, 'full_uuid': full_uuid, 'running': is_running } if any(keyword in name.lower() for keyword in ['reclamation', 'stoneblock', 'society', 'vanilla', 'mons']): servers['TX1'].append(server_info) elif any(keyword in name.lower() for keyword in ['ember', 'minecolonies', 'mods', 'homestead', 'subterra']): servers['NC1'].append(server_info) else: servers['Unknown'].append(server_info) return servers except Exception as e: print(f"Error fetching servers: {e}") return {'TX1': [], 'NC1': [], 'Unknown': []} @app.route('/') @auth.login_required def index(): servers = get_servers() return render_template('index.html', servers=servers) @app.route('/health') def health(): return jsonify({'service': 'whitelist-manager', 'status': 'healthy'}) @app.route('/api/servers') @auth.login_required def api_servers(): return jsonify(get_servers()) @app.route('/api/activity') @auth.login_required def get_activity(): """Get recent activity log""" return jsonify({'activity': activity_log[:10]}) @app.route('/api/check-whitelist/', methods=['GET']) @auth.login_required def check_whitelist_status(server_uuid): """Check whitelist status""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) status = loop.run_until_complete(check_whitelist_via_websocket(server_uuid)) loop.close() return jsonify({'success': True, 'status': status}) @app.route('/api/get-whitelist/', methods=['GET']) @auth.login_required def get_whitelist(server_uuid): """Get list of whitelisted players""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) players = loop.run_until_complete(get_whitelist_players(server_uuid)) loop.close() return jsonify({'success': True, 'players': players}) @app.route('/api/toggle-whitelist/', methods=['POST']) @auth.login_required def toggle_whitelist(server_uuid): """Toggle whitelist on/off""" data = request.json enable = data.get('enable', True) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) success = loop.run_until_complete(toggle_whitelist_via_websocket(server_uuid, enable)) loop.close() new_status = 'whitelisted' if enable else 'public' action = 'Enable whitelist' if enable else 'Disable whitelist' # Get server name servers = get_servers() server_name = None for node_servers in servers.values(): for server in node_servers: if server['uuid'] == server_uuid: server_name = server['name'] break log_activity(action, server=server_name, success=success) return jsonify({'success': success, 'status': new_status}) @app.route('/api/toggle-all-whitelist', methods=['POST']) @auth.login_required def toggle_all_whitelist(): """Toggle whitelist on/off for ALL servers""" data = request.json enable = data.get('enable', True) servers = get_servers() all_servers = [] for node_servers in servers.values(): all_servers.extend([s for s in node_servers if s['running']]) successes = 0 failures = 0 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) for server in all_servers: success = loop.run_until_complete(toggle_whitelist_via_websocket(server['uuid'], enable)) if success: successes += 1 else: failures += 1 loop.close() action = f"{'Enable' if enable else 'Disable'} whitelist on ALL servers" log_activity(action, success=(failures == 0)) return jsonify({ 'success': True, 'message': f"{'Enabled' if enable else 'Disabled'} whitelist on {successes} servers ({failures} failed)" }) @app.route('/api/validate-player', methods=['POST']) @auth.login_required def validate_player(): """Validate Minecraft username""" data = request.json player = data.get('player', '') if not validate_minecraft_username(player): return jsonify({'valid': False, 'message': 'Invalid username format (3-16 chars, alphanumeric + underscore)'}) return jsonify({'valid': True}) @app.route('/api/whitelist/add', methods=['POST']) @auth.login_required def add_to_whitelist(): data = request.json player = data.get('player') server_uuid = data.get('server_uuid') if not player or not server_uuid: return jsonify({'success': False, 'message': 'Missing player or server'}), 400 if not validate_minecraft_username(player): return jsonify({'success': False, 'message': 'Invalid username format'}), 400 headers = { 'Authorization': f'Bearer {API_KEY}', 'Content-Type': 'application/json', 'Accept': 'application/json' } if is_uuid(player): command = f'whitelist add {player}' display_name = f'UUID {player[:8]}...' else: command = f'whitelist add {player}' display_name = player try: response = requests.post( f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/command', headers=headers, json={'command': command} ) success = response.status_code == 204 # Get server name servers = get_servers() server_name = None for node_servers in servers.values(): for server in node_servers: if server['uuid'] == server_uuid: server_name = server['name'] break log_activity('Add player', player=display_name, server=server_name, success=success) if success: return jsonify({'success': True, 'message': f'Added {display_name} to whitelist'}) else: return jsonify({'success': False, 'message': f'API Error: {response.status_code}'}), 500 except Exception as e: log_activity('Add player', player=display_name, server=server_name, success=False) return jsonify({'success': False, 'message': f'Error: {str(e)}'}), 500 @app.route('/api/whitelist/remove', methods=['POST']) @auth.login_required def remove_from_whitelist(): data = request.json player = data.get('player') server_uuid = data.get('server_uuid') if not player or not server_uuid: return jsonify({'success': False, 'message': 'Missing player or server'}), 400 headers = { 'Authorization': f'Bearer {API_KEY}', 'Content-Type': 'application/json', 'Accept': 'application/json' } if is_uuid(player): command = f'whitelist remove {player}' display_name = f'UUID {player[:8]}...' else: command = f'whitelist remove {player}' display_name = player try: response = requests.post( f'{PTERODACTYL_URL}/api/client/servers/{server_uuid}/command', headers=headers, json={'command': command} ) success = response.status_code == 204 # Get server name servers = get_servers() server_name = None for node_servers in servers.values(): for server in node_servers: if server['uuid'] == server_uuid: server_name = server['name'] break log_activity('Remove player', player=display_name, server=server_name, success=success) if success: return jsonify({'success': True, 'message': f'Removed {display_name} from whitelist'}) else: return jsonify({'success': False, 'message': f'API Error: {response.status_code}'}), 500 except Exception as e: log_activity('Remove player', player=display_name, server=server_name, success=False) return jsonify({'success': False, 'message': f'Error: {str(e)}'}), 500 @app.route('/api/whitelist/add-all', methods=['POST']) @auth.login_required def add_to_all(): data = request.json player = data.get('player') if not player: return jsonify({'success': False, 'message': 'Missing player name'}), 400 if not validate_minecraft_username(player): return jsonify({'success': False, 'message': 'Invalid username format'}), 400 servers = get_servers() all_servers = [] for node_servers in servers.values(): all_servers.extend(node_servers) headers = { 'Authorization': f'Bearer {API_KEY}', 'Content-Type': 'application/json', 'Accept': 'application/json' } command = f'whitelist add {player}' successes = 0 failures = 0 if is_uuid(player): display_name = f'UUID {player[:8]}...' else: display_name = player for server in all_servers: try: response = requests.post( f'{PTERODACTYL_URL}/api/client/servers/{server["uuid"]}/command', headers=headers, json={'command': command} ) if response.status_code == 204: successes += 1 else: failures += 1 except Exception as e: failures += 1 log_activity('Bulk add player', player=display_name, server='ALL', success=(failures == 0)) return jsonify({ 'success': True, 'message': f'Added {display_name} to {successes} servers ({failures} failed)' }) @app.route('/api/whitelist/remove-all', methods=['POST']) @auth.login_required def remove_from_all(): data = request.json player = data.get('player') if not player: return jsonify({'success': False, 'message': 'Missing player name'}), 400 servers = get_servers() all_servers = [] for node_servers in servers.values(): all_servers.extend(node_servers) headers = { 'Authorization': f'Bearer {API_KEY}', 'Content-Type': 'application/json', 'Accept': 'application/json' } command = f'whitelist remove {player}' successes = 0 failures = 0 if is_uuid(player): display_name = f'UUID {player[:8]}...' else: display_name = player for server in all_servers: try: response = requests.post( f'{PTERODACTYL_URL}/api/client/servers/{server["uuid"]}/command', headers=headers, json={'command': command} ) if response.status_code == 204: successes += 1 else: failures += 1 except Exception as e: failures += 1 log_activity('Bulk remove player', player=display_name, server='ALL', success=(failures == 0)) return jsonify({ 'success': True, 'message': f'Removed {display_name} from {successes} servers ({failures} failed)' }) if __name__ == '__main__': app.run(host='0.0.0.0', port=5001)