#!/usr/bin/env python3 """ Unified Skill Builder Generates final skill structure from merged multi-source data: - SKILL.md with merged APIs and conflict warnings - references/ with organized content by source - Inline conflict markers (āš ļø) - Separate conflicts summary section Supports mixed sources (documentation, GitHub, PDF) and highlights discrepancies transparently. """ import os import json import shutil import logging from pathlib import Path from typing import Dict, List, Any, Optional logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class UnifiedSkillBuilder: """ Builds unified skill from multi-source data. """ def __init__(self, config: Dict, scraped_data: Dict, merged_data: Optional[Dict] = None, conflicts: Optional[List] = None): """ Initialize skill builder. Args: config: Unified config dict scraped_data: Dict of scraped data by source type merged_data: Merged API data (if conflicts were resolved) conflicts: List of detected conflicts """ self.config = config self.scraped_data = scraped_data self.merged_data = merged_data self.conflicts = conflicts or [] self.name = config['name'] self.description = config['description'] self.skill_dir = f"output/{self.name}" # Create directories os.makedirs(self.skill_dir, exist_ok=True) os.makedirs(f"{self.skill_dir}/references", exist_ok=True) os.makedirs(f"{self.skill_dir}/scripts", exist_ok=True) os.makedirs(f"{self.skill_dir}/assets", exist_ok=True) def build(self): """Build complete skill structure.""" logger.info(f"Building unified skill: {self.name}") # Generate main SKILL.md self._generate_skill_md() # Generate reference files by source self._generate_references() # Generate conflicts report (if any) if self.conflicts: self._generate_conflicts_report() logger.info(f"āœ… Unified skill built: {self.skill_dir}/") def _generate_skill_md(self): """Generate main SKILL.md file.""" skill_path = os.path.join(self.skill_dir, 'SKILL.md') # Generate skill name (lowercase, hyphens only, max 64 chars) skill_name = self.name.lower().replace('_', '-').replace(' ', '-')[:64] # Truncate description to 1024 chars if needed desc = self.description[:1024] if len(self.description) > 1024 else self.description content = f"""--- name: {skill_name} description: {desc} --- # {self.name.title()} {self.description} ## šŸ“š Sources This skill combines knowledge from multiple sources: """ # List sources for source in self.config.get('sources', []): source_type = source['type'] if source_type == 'documentation': content += f"- āœ… **Documentation**: {source.get('base_url', 'N/A')}\n" content += f" - Pages: {source.get('max_pages', 'unlimited')}\n" elif source_type == 'github': content += f"- āœ… **GitHub Repository**: {source.get('repo', 'N/A')}\n" content += f" - Code Analysis: {source.get('code_analysis_depth', 'surface')}\n" content += f" - Issues: {source.get('max_issues', 0)}\n" elif source_type == 'pdf': content += f"- āœ… **PDF Document**: {source.get('path', 'N/A')}\n" # Data quality section if self.conflicts: content += f"\n## āš ļø Data Quality\n\n" content += f"**{len(self.conflicts)} conflicts detected** between sources.\n\n" # Count by type by_type = {} for conflict in self.conflicts: ctype = conflict.type if hasattr(conflict, 'type') else conflict.get('type', 'unknown') by_type[ctype] = by_type.get(ctype, 0) + 1 content += "**Conflict Breakdown:**\n" for ctype, count in by_type.items(): content += f"- {ctype}: {count}\n" content += f"\nSee `references/conflicts.md` for detailed conflict information.\n" # Merged API section (if available) if self.merged_data: content += self._format_merged_apis() # Quick reference from each source content += "\n## šŸ“– Reference Documentation\n\n" content += "Organized by source:\n\n" for source in self.config.get('sources', []): source_type = source['type'] content += f"- [{source_type.title()}](references/{source_type}/)\n" # When to use this skill content += f"\n## šŸ’” When to Use This Skill\n\n" content += f"Use this skill when you need to:\n" content += f"- Understand how to use {self.name}\n" content += f"- Look up API documentation\n" content += f"- Find usage examples\n" if 'github' in self.scraped_data: content += f"- Check for known issues or recent changes\n" content += f"- Review release history\n" content += "\n---\n\n" content += "*Generated by Skill Seeker's unified multi-source scraper*\n" with open(skill_path, 'w', encoding='utf-8') as f: f.write(content) logger.info(f"Created SKILL.md") def _format_merged_apis(self) -> str: """Format merged APIs section with inline conflict warnings.""" if not self.merged_data: return "" content = "\n## šŸ”§ API Reference\n\n" content += "*Merged from documentation and code analysis*\n\n" apis = self.merged_data.get('apis', {}) if not apis: return content + "*No APIs to display*\n" # Group APIs by status matched = {k: v for k, v in apis.items() if v.get('status') == 'matched'} conflicts = {k: v for k, v in apis.items() if v.get('status') == 'conflict'} docs_only = {k: v for k, v in apis.items() if v.get('status') == 'docs_only'} code_only = {k: v for k, v in apis.items() if v.get('status') == 'code_only'} # Show matched APIs first if matched: content += "### āœ… Verified APIs\n\n" content += "*Documentation and code agree*\n\n" for api_name, api_data in list(matched.items())[:10]: # Limit to first 10 content += self._format_api_entry(api_data, inline_conflict=False) # Show conflicting APIs with warnings if conflicts: content += "\n### āš ļø APIs with Conflicts\n\n" content += "*Documentation and code differ*\n\n" for api_name, api_data in list(conflicts.items())[:10]: content += self._format_api_entry(api_data, inline_conflict=True) # Show undocumented APIs if code_only: content += f"\n### šŸ’» Undocumented APIs\n\n" content += f"*Found in code but not in documentation ({len(code_only)} total)*\n\n" for api_name, api_data in list(code_only.items())[:5]: content += self._format_api_entry(api_data, inline_conflict=False) # Show removed/missing APIs if docs_only: content += f"\n### šŸ“– Documentation-Only APIs\n\n" content += f"*Documented but not found in code ({len(docs_only)} total)*\n\n" for api_name, api_data in list(docs_only.items())[:5]: content += self._format_api_entry(api_data, inline_conflict=False) content += f"\n*See references/api/ for complete API documentation*\n" return content def _format_api_entry(self, api_data: Dict, inline_conflict: bool = False) -> str: """Format a single API entry.""" name = api_data.get('name', 'Unknown') signature = api_data.get('merged_signature', name) description = api_data.get('merged_description', '') warning = api_data.get('warning', '') entry = f"#### `{signature}`\n\n" if description: entry += f"{description}\n\n" # Add inline conflict warning if inline_conflict and warning: entry += f"āš ļø **Conflict**: {warning}\n\n" # Show both versions if available conflict = api_data.get('conflict', {}) if conflict: docs_info = conflict.get('docs_info') code_info = conflict.get('code_info') if docs_info and code_info: entry += "**Documentation says:**\n" entry += f"```\n{docs_info.get('raw_signature', 'N/A')}\n```\n\n" entry += "**Code implementation:**\n" entry += f"```\n{self._format_code_signature(code_info)}\n```\n\n" # Add source info source = api_data.get('source', 'unknown') entry += f"*Source: {source}*\n\n" entry += "---\n\n" return entry def _format_code_signature(self, code_info: Dict) -> str: """Format code signature for display.""" name = code_info.get('name', '') params = code_info.get('parameters', []) return_type = code_info.get('return_type') param_strs = [] for param in params: param_str = param.get('name', '') if param.get('type_hint'): param_str += f": {param['type_hint']}" if param.get('default'): param_str += f" = {param['default']}" param_strs.append(param_str) sig = f"{name}({', '.join(param_strs)})" if return_type: sig += f" -> {return_type}" return sig def _generate_references(self): """Generate reference files organized by source.""" logger.info("Generating reference files...") # Generate references for each source type (now lists) docs_list = self.scraped_data.get('documentation', []) if docs_list: self._generate_docs_references(docs_list) github_list = self.scraped_data.get('github', []) if github_list: self._generate_github_references(github_list) pdf_list = self.scraped_data.get('pdf', []) if pdf_list: self._generate_pdf_references(pdf_list) # Generate merged API reference if available if self.merged_data: self._generate_merged_api_reference() def _generate_docs_references(self, docs_list: List[Dict]): """Generate references from multiple documentation sources.""" # Skip if no documentation sources if not docs_list: return docs_dir = os.path.join(self.skill_dir, 'references', 'documentation') os.makedirs(docs_dir, exist_ok=True) all_copied_files: List[str] = [] # Process each documentation source for i, doc_source in enumerate(docs_list): source_id = doc_source.get('source_id', f'source_{i}') base_url = doc_source.get('base_url', 'Unknown') refs_dir = doc_source.get('refs_dir', '') # Create subdirectory for this source source_dir = os.path.join(docs_dir, source_id) os.makedirs(source_dir, exist_ok=True) copied_files: List[str] = [] if refs_dir and os.path.isdir(refs_dir): for entry in sorted(os.listdir(refs_dir)): src_path = os.path.join(refs_dir, entry) dst_path = os.path.join(source_dir, entry) if not os.path.isfile(src_path): continue shutil.copy2(src_path, dst_path) copied_files.append(entry) # Create index for this source source_index_path = os.path.join(source_dir, 'index.md') with open(source_index_path, 'w', encoding='utf-8') as f: f.write(f"# Documentation: {source_id}\n\n") f.write(f"**Source**: {base_url}\n\n") f.write(f"**Pages**: {doc_source.get('total_pages', 'N/A')}\n\n") if copied_files: files_no_index = [p for p in copied_files if p.lower() != 'index.md'] f.write("## Files\n\n") for filename in files_no_index: f.write(f"- [{filename}]({filename})\n") else: f.write("No reference files available.\n") all_copied_files.extend(copied_files) # Create main index index_path = os.path.join(docs_dir, 'index.md') with open(index_path, 'w', encoding='utf-8') as f: f.write("# Documentation References\n\n") f.write(f"Combined from {len(docs_list)} documentation sources.\n\n") f.write("## Sources\n\n") for doc_source in docs_list: source_id = doc_source.get('source_id', 'unknown') base_url = doc_source.get('base_url', 'Unknown') total_pages = doc_source.get('total_pages', 'N/A') f.write(f"- [{source_id}]({source_id}/index.md) - {base_url} ({total_pages} pages)\n") logger.info(f"Created documentation references ({len(docs_list)} sources)") def _generate_github_references(self, github_list: List[Dict]): """Generate references from multiple GitHub sources.""" # Skip if no GitHub sources if not github_list: return github_dir = os.path.join(self.skill_dir, 'references', 'github') os.makedirs(github_dir, exist_ok=True) # Process each GitHub source for i, github_source in enumerate(github_list): repo = github_source.get('repo', f'repo_{i}') repo_id = github_source.get('repo_id', repo.replace('/', '_')) github_data = github_source.get('data', {}) # Create subdirectory for this repo repo_dir = os.path.join(github_dir, repo_id) os.makedirs(repo_dir, exist_ok=True) # Create README reference if github_data.get('readme'): readme_path = os.path.join(repo_dir, 'README.md') with open(readme_path, 'w', encoding='utf-8') as f: f.write(f"# Repository README: {repo}\n\n") f.write(github_data['readme']) # Create issues reference if github_data.get('issues'): issues_path = os.path.join(repo_dir, 'issues.md') with open(issues_path, 'w', encoding='utf-8') as f: f.write(f"# GitHub Issues: {repo}\n\n") f.write(f"{len(github_data['issues'])} recent issues.\n\n") for issue in github_data['issues'][:20]: f.write(f"## #{issue['number']}: {issue['title']}\n\n") f.write(f"**State**: {issue['state']}\n") if issue.get('labels'): f.write(f"**Labels**: {', '.join(issue['labels'])}\n") f.write(f"**URL**: {issue.get('url', 'N/A')}\n\n") # Create releases reference if github_data.get('releases'): releases_path = os.path.join(repo_dir, 'releases.md') with open(releases_path, 'w', encoding='utf-8') as f: f.write(f"# Releases: {repo}\n\n") for release in github_data['releases'][:10]: f.write(f"## {release['tag_name']}: {release.get('name', 'N/A')}\n\n") f.write(f"**Published**: {release.get('published_at', 'N/A')[:10]}\n\n") if release.get('body'): f.write(release['body'][:500]) f.write("\n\n") # Create index for this repo repo_index_path = os.path.join(repo_dir, 'index.md') repo_info = github_data.get('repo_info', {}) with open(repo_index_path, 'w', encoding='utf-8') as f: f.write(f"# GitHub: {repo}\n\n") f.write(f"**Stars**: {repo_info.get('stars', 'N/A')}\n") f.write(f"**Language**: {repo_info.get('language', 'N/A')}\n") f.write(f"**Issues**: {len(github_data.get('issues', []))}\n") f.write(f"**Releases**: {len(github_data.get('releases', []))}\n\n") f.write("## Files\n\n") f.write("- [README.md](README.md)\n") if github_data.get('issues'): f.write("- [issues.md](issues.md)\n") if github_data.get('releases'): f.write("- [releases.md](releases.md)\n") # Create main index index_path = os.path.join(github_dir, 'index.md') with open(index_path, 'w', encoding='utf-8') as f: f.write("# GitHub References\n\n") f.write(f"Combined from {len(github_list)} GitHub repositories.\n\n") f.write("## Repositories\n\n") for github_source in github_list: repo = github_source.get('repo', 'unknown') repo_id = github_source.get('repo_id', repo.replace('/', '_')) github_data = github_source.get('data', {}) repo_info = github_data.get('repo_info', {}) stars = repo_info.get('stars', 'N/A') f.write(f"- [{repo}]({repo_id}/index.md) - {stars} stars\n") logger.info(f"Created GitHub references ({len(github_list)} repos)") def _generate_pdf_references(self, pdf_list: List[Dict]): """Generate references from PDF sources.""" # Skip if no PDF sources if not pdf_list: return pdf_dir = os.path.join(self.skill_dir, 'references', 'pdf') os.makedirs(pdf_dir, exist_ok=True) # Create index index_path = os.path.join(pdf_dir, 'index.md') with open(index_path, 'w', encoding='utf-8') as f: f.write("# PDF Documentation\n\n") f.write(f"Reference from {len(pdf_list)} PDF document(s).\n\n") logger.info(f"Created PDF references ({len(pdf_list)} sources)") def _generate_merged_api_reference(self): """Generate merged API reference file.""" api_dir = os.path.join(self.skill_dir, 'references', 'api') os.makedirs(api_dir, exist_ok=True) api_path = os.path.join(api_dir, 'merged_api.md') with open(api_path, 'w') as f: f.write("# Merged API Reference\n\n") f.write("*Combined from documentation and code analysis*\n\n") apis = self.merged_data.get('apis', {}) for api_name in sorted(apis.keys()): api_data = apis[api_name] entry = self._format_api_entry(api_data, inline_conflict=True) f.write(entry) logger.info(f"Created merged API reference ({len(apis)} APIs)") def _generate_conflicts_report(self): """Generate detailed conflicts report.""" conflicts_path = os.path.join(self.skill_dir, 'references', 'conflicts.md') with open(conflicts_path, 'w') as f: f.write("# Conflict Report\n\n") f.write(f"Found **{len(self.conflicts)}** conflicts between sources.\n\n") # Group by severity high = [c for c in self.conflicts if (hasattr(c, 'severity') and c.severity == 'high') or c.get('severity') == 'high'] medium = [c for c in self.conflicts if (hasattr(c, 'severity') and c.severity == 'medium') or c.get('severity') == 'medium'] low = [c for c in self.conflicts if (hasattr(c, 'severity') and c.severity == 'low') or c.get('severity') == 'low'] f.write("## Severity Breakdown\n\n") f.write(f"- šŸ”“ **High**: {len(high)} (action required)\n") f.write(f"- 🟔 **Medium**: {len(medium)} (review recommended)\n") f.write(f"- 🟢 **Low**: {len(low)} (informational)\n\n") # List high severity conflicts if high: f.write("## šŸ”“ High Severity\n\n") f.write("*These conflicts require immediate attention*\n\n") for conflict in high: api_name = conflict.api_name if hasattr(conflict, 'api_name') else conflict.get('api_name', 'Unknown') diff = conflict.difference if hasattr(conflict, 'difference') else conflict.get('difference', 'N/A') f.write(f"### {api_name}\n\n") f.write(f"**Issue**: {diff}\n\n") # List medium severity if medium: f.write("## 🟔 Medium Severity\n\n") for conflict in medium[:20]: # Limit to 20 api_name = conflict.api_name if hasattr(conflict, 'api_name') else conflict.get('api_name', 'Unknown') diff = conflict.difference if hasattr(conflict, 'difference') else conflict.get('difference', 'N/A') f.write(f"### {api_name}\n\n") f.write(f"{diff}\n\n") logger.info(f"Created conflicts report") if __name__ == '__main__': # Test with mock data import sys if len(sys.argv) < 2: print("Usage: python unified_skill_builder.py ") sys.exit(1) config_path = sys.argv[1] with open(config_path, 'r') as f: config = json.load(f) # Mock scraped data scraped_data = { 'github': { 'data': { 'readme': '# Test Repository', 'issues': [], 'releases': [] } } } builder = UnifiedSkillBuilder(config, scraped_data) builder.build() print(f"\nāœ… Test skill built in: output/{config['name']}/")