#!/usr/bin/env python3 """ Firefrost Gaming - World Backup Automation Automated backup system for Minecraft server worlds via Pterodactyl SFTP Author: Michael "Frostystyle" Krause & Claude "The Auditor" Version: 1.0.0 Date: 2026-02-17 """ import json import time import logging import tarfile import os import sys from datetime import datetime, timedelta from pathlib import Path try: import requests except ImportError: print("ERROR: requests module not installed. Run: pip3 install requests --break-system-packages") sys.exit(1) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/world-backup.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class WorldBackupSystem: def __init__(self, config_path='/opt/automation/backup-config.json'): """Initialize the backup system with configuration""" self.config = self.load_config(config_path) self.ptero_url = self.config['pterodactyl']['url'] self.ptero_key = self.config['pterodactyl']['api_key'] self.nextcloud_url = self.config['nextcloud']['webdav_url'] self.nextcloud_user = self.config['nextcloud']['username'] self.nextcloud_pass = self.config['nextcloud']['password'] self.discord_webhook = self.config['discord']['webhook_url'] self.discord_enabled = self.config['discord']['notifications_enabled'] self.settings = self.config['backup_settings'] self.servers = self.config['servers'] self.staging_dir = Path(self.settings['staging_dir']) self.staging_dir.mkdir(parents=True, exist_ok=True) self.results = { 'successful': [], 'failed': [], 'total_size': 0 } def load_config(self, path): """Load configuration from JSON file""" try: with open(path, 'r') as f: return json.load(f) except FileNotFoundError: logger.error(f"Config file not found: {path}") sys.exit(1) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in config file: {e}") sys.exit(1) def api_request(self, endpoint, method='GET'): """Make request to Pterodactyl API""" url = f"{self.ptero_url}/api/client/{endpoint}" headers = { 'Authorization': f'Bearer {self.ptero_key}', 'Accept': 'application/vnd.pterodactyl.v1+json' } try: if method == 'GET': response = requests.get(url, headers=headers, timeout=30) response.raise_for_status() return response.json() if response.text else {} except requests.exceptions.RequestException as e: logger.error(f"API request failed: {e}") return None def download_world_sftp(self, server): """Download world files via SFTP (simplified - assumes mounted filesystem access)""" server_name = server['name'] uuid = server['uuid'] world_path = server['world_path'] # For production, this would use actual SFTP # For now, assumes direct filesystem access on the node source_path = f"/var/lib/pterodactyl/volumes/{uuid}/{world_path}" logger.info(f"{server_name}: Preparing to backup from {source_path}") # In production, you'd use paramiko for SFTP: # import paramiko # sftp = paramiko.SFTPClient.from_transport(transport) # sftp.get_r(source_path, dest_path) return source_path def compress_world(self, server, source_path): """Compress world directory to tar.gz""" server_name = server['name'] timestamp = datetime.now().strftime('%Y%m%d-%H%M%S') backup_filename = f"{server_name.replace(' ', '-')}_{timestamp}.tar.gz" backup_path = self.staging_dir / backup_filename logger.info(f"{server_name}: Compressing to {backup_filename}") try: with tarfile.open(backup_path, "w:gz") as tar: tar.add(source_path, arcname=os.path.basename(source_path)) size_mb = backup_path.stat().st_size / (1024 * 1024) logger.info(f"{server_name}: Compressed to {size_mb:.1f} MB") return backup_path, size_mb except Exception as e: logger.error(f"{server_name}: Compression failed - {e}") return None, 0 def upload_to_nextcloud(self, backup_file, server_name): """Upload backup to NextCloud via WebDAV""" logger.info(f"{server_name}: Uploading to NextCloud") remote_path = f"{self.nextcloud_url}{self.config['nextcloud']['backup_path']}{server_name}/{backup_file.name}" try: with open(backup_file, 'rb') as f: response = requests.put( remote_path, data=f, auth=(self.nextcloud_user, self.nextcloud_pass), timeout=600 # 10 minutes for large files ) response.raise_for_status() logger.info(f"{server_name}: Upload successful") return True except requests.exceptions.RequestException as e: logger.error(f"{server_name}: Upload failed - {e}") return False def apply_retention_policy(self): """Apply retention policy to backups (7 daily, 4 weekly, 12 monthly)""" logger.info("Applying retention policy") # This would check NextCloud for old backups and delete them # Based on the retention rules in the deployment plan # Simplified version - in production would use WebDAV PROPFIND logger.info("Retention policy applied (placeholder)") def discord_notify(self, message, color=None): """Send notification to Discord webhook""" if not self.discord_enabled or not self.discord_webhook: return embed = { 'description': message, 'timestamp': datetime.utcnow().isoformat() } if color: embed['color'] = color payload = {'embeds': [embed]} try: requests.post(self.discord_webhook, json=payload, timeout=10) except requests.exceptions.RequestException as e: logger.error(f"Discord notification failed: {e}") def backup_server(self, server): """Backup a single server""" name = server['name'] logger.info(f"=== Starting backup for {name} ===") try: # Download world files source_path = self.download_world_sftp(server) # Compress backup_file, size_mb = self.compress_world(server, source_path) if not backup_file: raise Exception("Compression failed") # Upload to NextCloud if not self.upload_to_nextcloud(backup_file, name): raise Exception("Upload failed") # Clean up staging file backup_file.unlink() # Success self.results['successful'].append(name) self.results['total_size'] += size_mb self.discord_notify( f"āœ… **{name}** backed up successfully\n" f"Size: {size_mb:.1f} MB", color=65280 # Green ) return True except Exception as e: logger.error(f"{name}: Backup failed - {e}") self.results['failed'].append(name) self.discord_notify( f"āŒ **{name}** backup failed\n" f"Error: {str(e)}", color=16711680 # Red ) return False def run(self): """Main backup cycle""" logger.info("=" * 60) logger.info("WORLD BACKUP SYSTEM STARTED") logger.info(f"Servers to backup: {len(self.servers)}") logger.info("=" * 60) # Send start notification start_time = datetime.now() self.discord_notify( f"šŸ’¾ **World Backup Started**\n" f"Servers: {len(self.servers)}\n" f"Estimated duration: ~30 minutes", color=3447003 # Blue ) # Backup each server for i, server in enumerate(self.servers, 1): name = server['name'] logger.info(f"\n[{i}/{len(self.servers)}] Processing: {name}") self.backup_server(server) # Brief delay between backups if i < len(self.servers): time.sleep(5) # Apply retention policy self.apply_retention_policy() # Summary duration = (datetime.now() - start_time).total_seconds() / 60 logger.info("\n" + "=" * 60) logger.info("BACKUP CYCLE COMPLETE") logger.info(f"Successful: {len(self.results['successful'])}") logger.info(f"Failed: {len(self.results['failed'])}") logger.info(f"Total size: {self.results['total_size']:.1f} MB") logger.info(f"Duration: {duration:.1f} minutes") logger.info("=" * 60) # Send completion notification status_emoji = "āœ…" if len(self.results['failed']) == 0 else "āš ļø" summary = ( f"{status_emoji} **Backup Cycle Complete**\n" f"Successful: {len(self.results['successful'])}/{len(self.servers)}\n" f"Failed: {len(self.results['failed'])}\n" f"Total size: {self.results['total_size']:.1f} MB\n" f"Duration: {duration:.1f} minutes" ) if self.results['failed']: summary += f"\n\nāŒ **Failed Servers:**\n" + "\n".join(f"- {s}" for s in self.results['failed']) color = 65280 if len(self.results['failed']) == 0 else 16776960 # Green or Yellow self.discord_notify(summary, color=color) if __name__ == '__main__': try: backup_system = WorldBackupSystem() backup_system.run() except KeyboardInterrupt: logger.info("\nBackup cycle interrupted by user") sys.exit(0) except Exception as e: logger.error(f"Unexpected error: {e}", exc_info=True) sys.exit(1)