#!/usr/bin/env python3 """ Unified Multi-Source Scraper Orchestrates scraping from multiple sources (documentation, GitHub, PDF), detects conflicts, merges intelligently, and builds unified skills. This is the main entry point for unified config workflow. Usage: python3 cli/unified_scraper.py --config configs/godot_unified.json python3 cli/unified_scraper.py --config configs/react_unified.json --merge-mode claude-enhanced """ import os import sys import json import logging import argparse from pathlib import Path from typing import Dict, List, Any, Optional # Import validators and scrapers try: from config_validator import ConfigValidator, validate_config from conflict_detector import ConflictDetector from merge_sources import RuleBasedMerger, ClaudeEnhancedMerger except ImportError as e: print(f"Error importing modules: {e}") print("Make sure you're running from the project root directory") sys.exit(1) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class UnifiedScraper: """ Orchestrates multi-source scraping and merging. Main workflow: 1. Load and validate unified config 2. Scrape all sources (docs, GitHub, PDF) 3. Detect conflicts between sources 4. Merge intelligently (rule-based or Claude-enhanced) 5. Build unified skill """ def __init__(self, config_path: str, merge_mode: Optional[str] = None): """ Initialize unified scraper. Args: config_path: Path to unified config JSON merge_mode: Override config merge_mode ('rule-based' or 'claude-enhanced') """ self.config_path = config_path # Validate and load config logger.info(f"Loading config: {config_path}") self.validator = validate_config(config_path) self.config = self.validator.config # Determine merge mode self.merge_mode = merge_mode or self.config.get('merge_mode', 'rule-based') logger.info(f"Merge mode: {self.merge_mode}") # Storage for scraped data self.scraped_data = {} # Output paths self.name = self.config['name'] self.output_dir = f"output/{self.name}" self.data_dir = f"output/{self.name}_unified_data" os.makedirs(self.output_dir, exist_ok=True) os.makedirs(self.data_dir, exist_ok=True) def scrape_all_sources(self): """ Scrape all configured sources. Routes to appropriate scraper based on source type. """ logger.info("=" * 60) logger.info("PHASE 1: Scraping all sources") logger.info("=" * 60) if not self.validator.is_unified: logger.warning("Config is not unified format, converting...") self.config = self.validator.convert_legacy_to_unified() sources = self.config.get('sources', []) for i, source in enumerate(sources): source_type = source['type'] logger.info(f"\n[{i+1}/{len(sources)}] Scraping {source_type} source...") try: if source_type == 'documentation': self._scrape_documentation(source) elif source_type == 'github': self._scrape_github(source) elif source_type == 'pdf': self._scrape_pdf(source) else: logger.warning(f"Unknown source type: {source_type}") except Exception as e: logger.error(f"Error scraping {source_type}: {e}") logger.info("Continuing with other sources...") logger.info(f"\nāœ… Scraped {len(self.scraped_data)} sources successfully") def _scrape_documentation(self, source: Dict[str, Any]): """Scrape documentation website.""" # Import doc scraper sys.path.insert(0, str(Path(__file__).parent)) try: from doc_scraper import scrape_all, save_data except ImportError: logger.error("doc_scraper.py not found") return # Create temporary config for doc scraper doc_config = { 'name': f"{self.name}_docs", 'base_url': source['base_url'], 'selectors': source.get('selectors', {}), 'url_patterns': source.get('url_patterns', {}), 'categories': source.get('categories', {}), 'rate_limit': source.get('rate_limit', 0.5), 'max_pages': source.get('max_pages', 100) } # Scrape logger.info(f"Scraping documentation from {source['base_url']}") pages = scrape_all(doc_config) # Save data docs_data_file = os.path.join(self.data_dir, 'documentation_data.json') save_data(pages, docs_data_file, doc_config) self.scraped_data['documentation'] = { 'pages': pages, 'data_file': docs_data_file } logger.info(f"āœ… Documentation: {len(pages)} pages scraped") def _scrape_github(self, source: Dict[str, Any]): """Scrape GitHub repository.""" sys.path.insert(0, str(Path(__file__).parent)) try: from github_scraper import GitHubScraper except ImportError: logger.error("github_scraper.py not found") return # Create config for GitHub scraper github_config = { 'repo': source['repo'], 'name': f"{self.name}_github", 'github_token': source.get('github_token'), 'include_issues': source.get('include_issues', True), 'max_issues': source.get('max_issues', 100), 'include_changelog': source.get('include_changelog', True), 'include_releases': source.get('include_releases', True), 'include_code': source.get('include_code', True), 'code_analysis_depth': source.get('code_analysis_depth', 'surface'), 'file_patterns': source.get('file_patterns', []) } # Scrape logger.info(f"Scraping GitHub repository: {source['repo']}") scraper = GitHubScraper(github_config) github_data = scraper.scrape() # Save data github_data_file = os.path.join(self.data_dir, 'github_data.json') with open(github_data_file, 'w') as f: json.dump(github_data, f, indent=2, ensure_ascii=False) self.scraped_data['github'] = { 'data': github_data, 'data_file': github_data_file } logger.info(f"āœ… GitHub: Repository scraped successfully") def _scrape_pdf(self, source: Dict[str, Any]): """Scrape PDF document.""" sys.path.insert(0, str(Path(__file__).parent)) try: from pdf_scraper import PDFToSkillConverter except ImportError: logger.error("pdf_scraper.py not found") return # Create config for PDF scraper pdf_config = { 'name': f"{self.name}_pdf", 'pdf': source['path'], 'extract_tables': source.get('extract_tables', False), 'ocr': source.get('ocr', False), 'password': source.get('password') } # Scrape logger.info(f"Scraping PDF: {source['path']}") converter = PDFToSkillConverter(pdf_config) pdf_data = converter.extract_all() # Save data pdf_data_file = os.path.join(self.data_dir, 'pdf_data.json') with open(pdf_data_file, 'w') as f: json.dump(pdf_data, f, indent=2, ensure_ascii=False) self.scraped_data['pdf'] = { 'data': pdf_data, 'data_file': pdf_data_file } logger.info(f"āœ… PDF: {len(pdf_data.get('pages', []))} pages extracted") def detect_conflicts(self) -> List: """ Detect conflicts between documentation and code. Only applicable if both documentation and GitHub sources exist. Returns: List of conflicts """ logger.info("\n" + "=" * 60) logger.info("PHASE 2: Detecting conflicts") logger.info("=" * 60) if not self.validator.needs_api_merge(): logger.info("No API merge needed (only one API source)") return [] # Get documentation and GitHub data docs_data = self.scraped_data.get('documentation', {}) github_data = self.scraped_data.get('github', {}) if not docs_data or not github_data: logger.warning("Missing documentation or GitHub data for conflict detection") return [] # Load data files with open(docs_data['data_file'], 'r') as f: docs_json = json.load(f) with open(github_data['data_file'], 'r') as f: github_json = json.load(f) # Detect conflicts detector = ConflictDetector(docs_json, github_json) conflicts = detector.detect_all_conflicts() # Save conflicts conflicts_file = os.path.join(self.data_dir, 'conflicts.json') detector.save_conflicts(conflicts, conflicts_file) # Print summary summary = detector.generate_summary(conflicts) logger.info(f"\nšŸ“Š Conflict Summary:") logger.info(f" Total: {summary['total']}") logger.info(f" By Type:") for ctype, count in summary['by_type'].items(): if count > 0: logger.info(f" - {ctype}: {count}") logger.info(f" By Severity:") for severity, count in summary['by_severity'].items(): if count > 0: emoji = 'šŸ”“' if severity == 'high' else '🟔' if severity == 'medium' else '🟢' logger.info(f" {emoji} {severity}: {count}") return conflicts def merge_sources(self, conflicts: List): """ Merge data from multiple sources. Args: conflicts: List of detected conflicts """ logger.info("\n" + "=" * 60) logger.info(f"PHASE 3: Merging sources ({self.merge_mode})") logger.info("=" * 60) if not conflicts: logger.info("No conflicts to merge") return None # Get data files docs_data = self.scraped_data.get('documentation', {}) github_data = self.scraped_data.get('github', {}) # Load data with open(docs_data['data_file'], 'r') as f: docs_json = json.load(f) with open(github_data['data_file'], 'r') as f: github_json = json.load(f) # Choose merger if self.merge_mode == 'claude-enhanced': merger = ClaudeEnhancedMerger(docs_json, github_json, conflicts) else: merger = RuleBasedMerger(docs_json, github_json, conflicts) # Merge merged_data = merger.merge_all() # Save merged data merged_file = os.path.join(self.data_dir, 'merged_data.json') with open(merged_file, 'w') as f: json.dump(merged_data, f, indent=2, ensure_ascii=False) logger.info(f"āœ… Merged data saved: {merged_file}") return merged_data def build_skill(self, merged_data: Optional[Dict] = None): """ Build final unified skill. Args: merged_data: Merged API data (if conflicts were resolved) """ logger.info("\n" + "=" * 60) logger.info("PHASE 4: Building unified skill") logger.info("=" * 60) # This will be implemented in Phase 7 logger.info("Skill building to be implemented in Phase 7") logger.info(f"Output directory: {self.output_dir}") logger.info(f"Data directory: {self.data_dir}") # For now, just create a placeholder skill_file = os.path.join(self.output_dir, 'SKILL.md') with open(skill_file, 'w') as f: f.write(f"# {self.config['name'].title()}\n\n") f.write(f"{self.config['description']}\n\n") f.write("## Sources\n\n") for source in self.config.get('sources', []): f.write(f"- {source['type']}\n") f.write("\n*Skill building in progress...*\n") logger.info(f"āœ… Placeholder skill created: {skill_file}") def run(self): """ Execute complete unified scraping workflow. """ logger.info("\n" + "šŸš€ " * 20) logger.info(f"Unified Scraper: {self.config['name']}") logger.info("šŸš€ " * 20 + "\n") try: # Phase 1: Scrape all sources self.scrape_all_sources() # Phase 2: Detect conflicts (if applicable) conflicts = self.detect_conflicts() # Phase 3: Merge sources (if conflicts exist) merged_data = None if conflicts: merged_data = self.merge_sources(conflicts) # Phase 4: Build skill self.build_skill(merged_data) logger.info("\n" + "āœ… " * 20) logger.info("Unified scraping complete!") logger.info("āœ… " * 20 + "\n") logger.info(f"šŸ“ Output: {self.output_dir}/") logger.info(f"šŸ“ Data: {self.data_dir}/") except KeyboardInterrupt: logger.info("\n\nāš ļø Scraping interrupted by user") sys.exit(1) except Exception as e: logger.error(f"\n\nāŒ Error during scraping: {e}") import traceback traceback.print_exc() sys.exit(1) def main(): """Main entry point.""" parser = argparse.ArgumentParser( description='Unified multi-source scraper', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Basic usage with unified config python3 cli/unified_scraper.py --config configs/godot_unified.json # Override merge mode python3 cli/unified_scraper.py --config configs/react_unified.json --merge-mode claude-enhanced # Backward compatible with legacy configs python3 cli/unified_scraper.py --config configs/react.json """ ) parser.add_argument('--config', '-c', required=True, help='Path to unified config JSON file') parser.add_argument('--merge-mode', '-m', choices=['rule-based', 'claude-enhanced'], help='Override config merge mode') args = parser.parse_args() # Create and run scraper scraper = UnifiedScraper(args.config, args.merge_mode) scraper.run() if __name__ == '__main__': main()