feat: Phase 1-2 - Unified config format + deep code analysis
Phase 1: Unified Config Format - Created config_validator.py with full validation - Supports multiple sources (documentation, github, pdf) - Backward compatible with legacy configs - Auto-converts legacy → unified format - Validates merge_mode and code_analysis_depth Phase 2: Deep Code Analysis - Created code_analyzer.py with language-specific parsers - Supports Python (AST), JavaScript/TypeScript (regex), C/C++ (regex) - Configurable depth: surface, deep, full - Extracts classes, functions, parameters, types, docstrings - Integrated into github_scraper.py Features: ✅ Unified config with sources array ✅ Code analysis depth: surface/deep/full ✅ Language detection and parser selection ✅ Signature extraction with full parameter info ✅ Type hints and default values captured ✅ Docstring extraction ✅ Example config: godot_unified.json Next: Conflict detection and merging 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
491
cli/code_analyzer.py
Normal file
491
cli/code_analyzer.py
Normal file
@@ -0,0 +1,491 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Code Analyzer for GitHub Repositories
|
||||
|
||||
Extracts code signatures at configurable depth levels:
|
||||
- surface: File tree only (existing behavior)
|
||||
- deep: Parse files for signatures, parameters, types
|
||||
- full: Complete AST analysis (future enhancement)
|
||||
|
||||
Supports multiple languages with language-specific parsers.
|
||||
"""
|
||||
|
||||
import ast
|
||||
import re
|
||||
import logging
|
||||
from typing import Dict, List, Any, Optional
|
||||
from dataclasses import dataclass, asdict
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Parameter:
|
||||
"""Represents a function parameter."""
|
||||
name: str
|
||||
type_hint: Optional[str] = None
|
||||
default: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class FunctionSignature:
|
||||
"""Represents a function/method signature."""
|
||||
name: str
|
||||
parameters: List[Parameter]
|
||||
return_type: Optional[str] = None
|
||||
docstring: Optional[str] = None
|
||||
line_number: Optional[int] = None
|
||||
is_async: bool = False
|
||||
is_method: bool = False
|
||||
decorators: List[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.decorators is None:
|
||||
self.decorators = []
|
||||
|
||||
|
||||
@dataclass
|
||||
class ClassSignature:
|
||||
"""Represents a class signature."""
|
||||
name: str
|
||||
base_classes: List[str]
|
||||
methods: List[FunctionSignature]
|
||||
docstring: Optional[str] = None
|
||||
line_number: Optional[int] = None
|
||||
|
||||
|
||||
class CodeAnalyzer:
|
||||
"""
|
||||
Analyzes code at different depth levels.
|
||||
"""
|
||||
|
||||
def __init__(self, depth: str = 'surface'):
|
||||
"""
|
||||
Initialize code analyzer.
|
||||
|
||||
Args:
|
||||
depth: Analysis depth ('surface', 'deep', 'full')
|
||||
"""
|
||||
self.depth = depth
|
||||
|
||||
def analyze_file(self, file_path: str, content: str, language: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Analyze a single file based on depth level.
|
||||
|
||||
Args:
|
||||
file_path: Path to file in repository
|
||||
content: File content as string
|
||||
language: Programming language (Python, JavaScript, etc.)
|
||||
|
||||
Returns:
|
||||
Dict containing extracted signatures
|
||||
"""
|
||||
if self.depth == 'surface':
|
||||
return {} # Surface level doesn't analyze individual files
|
||||
|
||||
logger.debug(f"Analyzing {file_path} (language: {language}, depth: {self.depth})")
|
||||
|
||||
try:
|
||||
if language == 'Python':
|
||||
return self._analyze_python(content, file_path)
|
||||
elif language in ['JavaScript', 'TypeScript']:
|
||||
return self._analyze_javascript(content, file_path)
|
||||
elif language in ['C', 'C++']:
|
||||
return self._analyze_cpp(content, file_path)
|
||||
else:
|
||||
logger.debug(f"No analyzer for language: {language}")
|
||||
return {}
|
||||
except Exception as e:
|
||||
logger.warning(f"Error analyzing {file_path}: {e}")
|
||||
return {}
|
||||
|
||||
def _analyze_python(self, content: str, file_path: str) -> Dict[str, Any]:
|
||||
"""Analyze Python file using AST."""
|
||||
try:
|
||||
tree = ast.parse(content)
|
||||
except SyntaxError as e:
|
||||
logger.debug(f"Syntax error in {file_path}: {e}")
|
||||
return {}
|
||||
|
||||
classes = []
|
||||
functions = []
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.ClassDef):
|
||||
class_sig = self._extract_python_class(node)
|
||||
classes.append(asdict(class_sig))
|
||||
elif isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef):
|
||||
# Only top-level functions (not methods)
|
||||
if not any(isinstance(parent, ast.ClassDef)
|
||||
for parent in ast.walk(tree) if hasattr(parent, 'body') and node in parent.body):
|
||||
func_sig = self._extract_python_function(node)
|
||||
functions.append(asdict(func_sig))
|
||||
|
||||
return {
|
||||
'classes': classes,
|
||||
'functions': functions
|
||||
}
|
||||
|
||||
def _extract_python_class(self, node: ast.ClassDef) -> ClassSignature:
|
||||
"""Extract class signature from AST node."""
|
||||
# Extract base classes
|
||||
bases = []
|
||||
for base in node.bases:
|
||||
if isinstance(base, ast.Name):
|
||||
bases.append(base.id)
|
||||
elif isinstance(base, ast.Attribute):
|
||||
bases.append(f"{base.value.id}.{base.attr}" if hasattr(base.value, 'id') else base.attr)
|
||||
|
||||
# Extract methods
|
||||
methods = []
|
||||
for item in node.body:
|
||||
if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||
method_sig = self._extract_python_function(item, is_method=True)
|
||||
methods.append(method_sig)
|
||||
|
||||
# Extract docstring
|
||||
docstring = ast.get_docstring(node)
|
||||
|
||||
return ClassSignature(
|
||||
name=node.name,
|
||||
base_classes=bases,
|
||||
methods=methods,
|
||||
docstring=docstring,
|
||||
line_number=node.lineno
|
||||
)
|
||||
|
||||
def _extract_python_function(self, node, is_method: bool = False) -> FunctionSignature:
|
||||
"""Extract function signature from AST node."""
|
||||
# Extract parameters
|
||||
params = []
|
||||
for arg in node.args.args:
|
||||
param_type = None
|
||||
if arg.annotation:
|
||||
param_type = ast.unparse(arg.annotation) if hasattr(ast, 'unparse') else None
|
||||
|
||||
params.append(Parameter(
|
||||
name=arg.arg,
|
||||
type_hint=param_type
|
||||
))
|
||||
|
||||
# Extract defaults
|
||||
defaults = node.args.defaults
|
||||
if defaults:
|
||||
# Defaults are aligned to the end of params
|
||||
num_no_default = len(params) - len(defaults)
|
||||
for i, default in enumerate(defaults):
|
||||
param_idx = num_no_default + i
|
||||
if param_idx < len(params):
|
||||
try:
|
||||
params[param_idx].default = ast.unparse(default) if hasattr(ast, 'unparse') else str(default)
|
||||
except:
|
||||
params[param_idx].default = "..."
|
||||
|
||||
# Extract return type
|
||||
return_type = None
|
||||
if node.returns:
|
||||
try:
|
||||
return_type = ast.unparse(node.returns) if hasattr(ast, 'unparse') else None
|
||||
except:
|
||||
pass
|
||||
|
||||
# Extract decorators
|
||||
decorators = []
|
||||
for decorator in node.decorator_list:
|
||||
try:
|
||||
if hasattr(ast, 'unparse'):
|
||||
decorators.append(ast.unparse(decorator))
|
||||
elif isinstance(decorator, ast.Name):
|
||||
decorators.append(decorator.id)
|
||||
except:
|
||||
pass
|
||||
|
||||
# Extract docstring
|
||||
docstring = ast.get_docstring(node)
|
||||
|
||||
return FunctionSignature(
|
||||
name=node.name,
|
||||
parameters=params,
|
||||
return_type=return_type,
|
||||
docstring=docstring,
|
||||
line_number=node.lineno,
|
||||
is_async=isinstance(node, ast.AsyncFunctionDef),
|
||||
is_method=is_method,
|
||||
decorators=decorators
|
||||
)
|
||||
|
||||
def _analyze_javascript(self, content: str, file_path: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Analyze JavaScript/TypeScript file using regex patterns.
|
||||
|
||||
Note: This is a simplified approach. For production, consider using
|
||||
a proper JS/TS parser like esprima or ts-morph.
|
||||
"""
|
||||
classes = []
|
||||
functions = []
|
||||
|
||||
# Extract class definitions
|
||||
class_pattern = r'class\s+(\w+)(?:\s+extends\s+(\w+))?\s*\{'
|
||||
for match in re.finditer(class_pattern, content):
|
||||
class_name = match.group(1)
|
||||
base_class = match.group(2) if match.group(2) else None
|
||||
|
||||
# Try to extract methods (simplified)
|
||||
class_block_start = match.end()
|
||||
# This is a simplification - proper parsing would track braces
|
||||
class_block_end = content.find('}', class_block_start)
|
||||
if class_block_end != -1:
|
||||
class_body = content[class_block_start:class_block_end]
|
||||
methods = self._extract_js_methods(class_body)
|
||||
else:
|
||||
methods = []
|
||||
|
||||
classes.append({
|
||||
'name': class_name,
|
||||
'base_classes': [base_class] if base_class else [],
|
||||
'methods': methods,
|
||||
'docstring': None,
|
||||
'line_number': content[:match.start()].count('\n') + 1
|
||||
})
|
||||
|
||||
# Extract top-level functions
|
||||
func_pattern = r'(?:async\s+)?function\s+(\w+)\s*\(([^)]*)\)'
|
||||
for match in re.finditer(func_pattern, content):
|
||||
func_name = match.group(1)
|
||||
params_str = match.group(2)
|
||||
is_async = 'async' in match.group(0)
|
||||
|
||||
params = self._parse_js_parameters(params_str)
|
||||
|
||||
functions.append({
|
||||
'name': func_name,
|
||||
'parameters': params,
|
||||
'return_type': None, # JS doesn't have type annotations (unless TS)
|
||||
'docstring': None,
|
||||
'line_number': content[:match.start()].count('\n') + 1,
|
||||
'is_async': is_async,
|
||||
'is_method': False,
|
||||
'decorators': []
|
||||
})
|
||||
|
||||
# Extract arrow functions assigned to const/let
|
||||
arrow_pattern = r'(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\(([^)]*)\)\s*=>'
|
||||
for match in re.finditer(arrow_pattern, content):
|
||||
func_name = match.group(1)
|
||||
params_str = match.group(2)
|
||||
is_async = 'async' in match.group(0)
|
||||
|
||||
params = self._parse_js_parameters(params_str)
|
||||
|
||||
functions.append({
|
||||
'name': func_name,
|
||||
'parameters': params,
|
||||
'return_type': None,
|
||||
'docstring': None,
|
||||
'line_number': content[:match.start()].count('\n') + 1,
|
||||
'is_async': is_async,
|
||||
'is_method': False,
|
||||
'decorators': []
|
||||
})
|
||||
|
||||
return {
|
||||
'classes': classes,
|
||||
'functions': functions
|
||||
}
|
||||
|
||||
def _extract_js_methods(self, class_body: str) -> List[Dict]:
|
||||
"""Extract method signatures from class body."""
|
||||
methods = []
|
||||
|
||||
# Match method definitions
|
||||
method_pattern = r'(?:async\s+)?(\w+)\s*\(([^)]*)\)'
|
||||
for match in re.finditer(method_pattern, class_body):
|
||||
method_name = match.group(1)
|
||||
params_str = match.group(2)
|
||||
is_async = 'async' in match.group(0)
|
||||
|
||||
# Skip constructor keyword detection
|
||||
if method_name in ['if', 'for', 'while', 'switch']:
|
||||
continue
|
||||
|
||||
params = self._parse_js_parameters(params_str)
|
||||
|
||||
methods.append({
|
||||
'name': method_name,
|
||||
'parameters': params,
|
||||
'return_type': None,
|
||||
'docstring': None,
|
||||
'line_number': None,
|
||||
'is_async': is_async,
|
||||
'is_method': True,
|
||||
'decorators': []
|
||||
})
|
||||
|
||||
return methods
|
||||
|
||||
def _parse_js_parameters(self, params_str: str) -> List[Dict]:
|
||||
"""Parse JavaScript parameter string."""
|
||||
params = []
|
||||
|
||||
if not params_str.strip():
|
||||
return params
|
||||
|
||||
# Split by comma (simplified - doesn't handle complex default values)
|
||||
param_list = [p.strip() for p in params_str.split(',')]
|
||||
|
||||
for param in param_list:
|
||||
if not param:
|
||||
continue
|
||||
|
||||
# Check for default value
|
||||
if '=' in param:
|
||||
name, default = param.split('=', 1)
|
||||
name = name.strip()
|
||||
default = default.strip()
|
||||
else:
|
||||
name = param
|
||||
default = None
|
||||
|
||||
# Check for type annotation (TypeScript)
|
||||
type_hint = None
|
||||
if ':' in name:
|
||||
name, type_hint = name.split(':', 1)
|
||||
name = name.strip()
|
||||
type_hint = type_hint.strip()
|
||||
|
||||
params.append({
|
||||
'name': name,
|
||||
'type_hint': type_hint,
|
||||
'default': default
|
||||
})
|
||||
|
||||
return params
|
||||
|
||||
def _analyze_cpp(self, content: str, file_path: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Analyze C/C++ header file using regex patterns.
|
||||
|
||||
Note: This is a simplified approach focusing on header files.
|
||||
For production, consider using libclang or similar.
|
||||
"""
|
||||
classes = []
|
||||
functions = []
|
||||
|
||||
# Extract class definitions (simplified - doesn't handle nested classes)
|
||||
class_pattern = r'class\s+(\w+)(?:\s*:\s*public\s+(\w+))?\s*\{'
|
||||
for match in re.finditer(class_pattern, content):
|
||||
class_name = match.group(1)
|
||||
base_class = match.group(2) if match.group(2) else None
|
||||
|
||||
classes.append({
|
||||
'name': class_name,
|
||||
'base_classes': [base_class] if base_class else [],
|
||||
'methods': [], # Simplified - would need to parse class body
|
||||
'docstring': None,
|
||||
'line_number': content[:match.start()].count('\n') + 1
|
||||
})
|
||||
|
||||
# Extract function declarations
|
||||
func_pattern = r'(\w+(?:\s*\*|\s*&)?)\s+(\w+)\s*\(([^)]*)\)'
|
||||
for match in re.finditer(func_pattern, content):
|
||||
return_type = match.group(1).strip()
|
||||
func_name = match.group(2)
|
||||
params_str = match.group(3)
|
||||
|
||||
# Skip common keywords
|
||||
if func_name in ['if', 'for', 'while', 'switch', 'return']:
|
||||
continue
|
||||
|
||||
params = self._parse_cpp_parameters(params_str)
|
||||
|
||||
functions.append({
|
||||
'name': func_name,
|
||||
'parameters': params,
|
||||
'return_type': return_type,
|
||||
'docstring': None,
|
||||
'line_number': content[:match.start()].count('\n') + 1,
|
||||
'is_async': False,
|
||||
'is_method': False,
|
||||
'decorators': []
|
||||
})
|
||||
|
||||
return {
|
||||
'classes': classes,
|
||||
'functions': functions
|
||||
}
|
||||
|
||||
def _parse_cpp_parameters(self, params_str: str) -> List[Dict]:
|
||||
"""Parse C++ parameter string."""
|
||||
params = []
|
||||
|
||||
if not params_str.strip() or params_str.strip() == 'void':
|
||||
return params
|
||||
|
||||
# Split by comma (simplified)
|
||||
param_list = [p.strip() for p in params_str.split(',')]
|
||||
|
||||
for param in param_list:
|
||||
if not param:
|
||||
continue
|
||||
|
||||
# Check for default value
|
||||
default = None
|
||||
if '=' in param:
|
||||
param, default = param.rsplit('=', 1)
|
||||
param = param.strip()
|
||||
default = default.strip()
|
||||
|
||||
# Extract type and name (simplified)
|
||||
# Format: "type name" or "type* name" or "type& name"
|
||||
parts = param.split()
|
||||
if len(parts) >= 2:
|
||||
param_type = ' '.join(parts[:-1])
|
||||
param_name = parts[-1]
|
||||
else:
|
||||
param_type = param
|
||||
param_name = "unknown"
|
||||
|
||||
params.append({
|
||||
'name': param_name,
|
||||
'type_hint': param_type,
|
||||
'default': default
|
||||
})
|
||||
|
||||
return params
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Test the analyzer
|
||||
python_code = '''
|
||||
class Node2D:
|
||||
"""Base class for 2D nodes."""
|
||||
|
||||
def move_local_x(self, delta: float, snap: bool = False) -> None:
|
||||
"""Move node along local X axis."""
|
||||
pass
|
||||
|
||||
async def tween_position(self, target: tuple, duration: float = 1.0):
|
||||
"""Animate position to target."""
|
||||
pass
|
||||
|
||||
def create_sprite(texture: str) -> Node2D:
|
||||
"""Create a new sprite node."""
|
||||
return Node2D()
|
||||
'''
|
||||
|
||||
analyzer = CodeAnalyzer(depth='deep')
|
||||
result = analyzer.analyze_file('test.py', python_code, 'Python')
|
||||
|
||||
print("Analysis Result:")
|
||||
print(f"Classes: {len(result.get('classes', []))}")
|
||||
print(f"Functions: {len(result.get('functions', []))}")
|
||||
|
||||
if result.get('classes'):
|
||||
cls = result['classes'][0]
|
||||
print(f"\nClass: {cls['name']}")
|
||||
print(f" Methods: {len(cls['methods'])}")
|
||||
for method in cls['methods']:
|
||||
params = ', '.join([f"{p['name']}: {p['type_hint']}" + (f" = {p['default']}" if p.get('default') else "")
|
||||
for p in method['parameters']])
|
||||
print(f" {method['name']}({params}) -> {method['return_type']}")
|
||||
367
cli/config_validator.py
Normal file
367
cli/config_validator.py
Normal file
@@ -0,0 +1,367 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Unified Config Validator
|
||||
|
||||
Validates unified config format that supports multiple sources:
|
||||
- documentation (website scraping)
|
||||
- github (repository scraping)
|
||||
- pdf (PDF document scraping)
|
||||
|
||||
Also provides backward compatibility detection for legacy configs.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, Any, List, Optional
|
||||
from pathlib import Path
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConfigValidator:
|
||||
"""
|
||||
Validates unified config format and provides backward compatibility.
|
||||
"""
|
||||
|
||||
# Valid source types
|
||||
VALID_SOURCE_TYPES = {'documentation', 'github', 'pdf'}
|
||||
|
||||
# Valid merge modes
|
||||
VALID_MERGE_MODES = {'rule-based', 'claude-enhanced'}
|
||||
|
||||
# Valid code analysis depth levels
|
||||
VALID_DEPTH_LEVELS = {'surface', 'deep', 'full'}
|
||||
|
||||
def __init__(self, config_path: str):
|
||||
"""Initialize validator with config file path."""
|
||||
self.config_path = config_path
|
||||
self.config = self._load_config()
|
||||
self.is_unified = self._detect_format()
|
||||
|
||||
def _load_config(self) -> Dict[str, Any]:
|
||||
"""Load JSON config file."""
|
||||
try:
|
||||
with open(self.config_path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
raise ValueError(f"Config file not found: {self.config_path}")
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"Invalid JSON in config file: {e}")
|
||||
|
||||
def _detect_format(self) -> bool:
|
||||
"""
|
||||
Detect if config is unified format or legacy.
|
||||
|
||||
Returns:
|
||||
True if unified format (has 'sources' array)
|
||||
False if legacy format
|
||||
"""
|
||||
return 'sources' in self.config and isinstance(self.config['sources'], list)
|
||||
|
||||
def validate(self) -> bool:
|
||||
"""
|
||||
Validate config based on detected format.
|
||||
|
||||
Returns:
|
||||
True if valid
|
||||
|
||||
Raises:
|
||||
ValueError if invalid with detailed error message
|
||||
"""
|
||||
if self.is_unified:
|
||||
return self._validate_unified()
|
||||
else:
|
||||
return self._validate_legacy()
|
||||
|
||||
def _validate_unified(self) -> bool:
|
||||
"""Validate unified config format."""
|
||||
logger.info("Validating unified config format...")
|
||||
|
||||
# Required top-level fields
|
||||
if 'name' not in self.config:
|
||||
raise ValueError("Missing required field: 'name'")
|
||||
|
||||
if 'description' not in self.config:
|
||||
raise ValueError("Missing required field: 'description'")
|
||||
|
||||
if 'sources' not in self.config:
|
||||
raise ValueError("Missing required field: 'sources'")
|
||||
|
||||
# Validate sources array
|
||||
sources = self.config['sources']
|
||||
|
||||
if not isinstance(sources, list):
|
||||
raise ValueError("'sources' must be an array")
|
||||
|
||||
if len(sources) == 0:
|
||||
raise ValueError("'sources' array cannot be empty")
|
||||
|
||||
# Validate merge_mode (optional)
|
||||
merge_mode = self.config.get('merge_mode', 'rule-based')
|
||||
if merge_mode not in self.VALID_MERGE_MODES:
|
||||
raise ValueError(f"Invalid merge_mode: '{merge_mode}'. Must be one of {self.VALID_MERGE_MODES}")
|
||||
|
||||
# Validate each source
|
||||
for i, source in enumerate(sources):
|
||||
self._validate_source(source, i)
|
||||
|
||||
logger.info(f"✅ Unified config valid: {len(sources)} sources")
|
||||
return True
|
||||
|
||||
def _validate_source(self, source: Dict[str, Any], index: int):
|
||||
"""Validate individual source configuration."""
|
||||
# Check source has 'type' field
|
||||
if 'type' not in source:
|
||||
raise ValueError(f"Source {index}: Missing required field 'type'")
|
||||
|
||||
source_type = source['type']
|
||||
|
||||
if source_type not in self.VALID_SOURCE_TYPES:
|
||||
raise ValueError(
|
||||
f"Source {index}: Invalid type '{source_type}'. "
|
||||
f"Must be one of {self.VALID_SOURCE_TYPES}"
|
||||
)
|
||||
|
||||
# Type-specific validation
|
||||
if source_type == 'documentation':
|
||||
self._validate_documentation_source(source, index)
|
||||
elif source_type == 'github':
|
||||
self._validate_github_source(source, index)
|
||||
elif source_type == 'pdf':
|
||||
self._validate_pdf_source(source, index)
|
||||
|
||||
def _validate_documentation_source(self, source: Dict[str, Any], index: int):
|
||||
"""Validate documentation source configuration."""
|
||||
if 'base_url' not in source:
|
||||
raise ValueError(f"Source {index} (documentation): Missing required field 'base_url'")
|
||||
|
||||
# Optional but recommended fields
|
||||
if 'selectors' not in source:
|
||||
logger.warning(f"Source {index} (documentation): No 'selectors' specified, using defaults")
|
||||
|
||||
if 'max_pages' in source and not isinstance(source['max_pages'], int):
|
||||
raise ValueError(f"Source {index} (documentation): 'max_pages' must be an integer")
|
||||
|
||||
def _validate_github_source(self, source: Dict[str, Any], index: int):
|
||||
"""Validate GitHub source configuration."""
|
||||
if 'repo' not in source:
|
||||
raise ValueError(f"Source {index} (github): Missing required field 'repo'")
|
||||
|
||||
# Validate repo format (owner/repo)
|
||||
repo = source['repo']
|
||||
if '/' not in repo:
|
||||
raise ValueError(
|
||||
f"Source {index} (github): Invalid repo format '{repo}'. "
|
||||
f"Must be 'owner/repo' (e.g., 'facebook/react')"
|
||||
)
|
||||
|
||||
# Validate code_analysis_depth if specified
|
||||
if 'code_analysis_depth' in source:
|
||||
depth = source['code_analysis_depth']
|
||||
if depth not in self.VALID_DEPTH_LEVELS:
|
||||
raise ValueError(
|
||||
f"Source {index} (github): Invalid code_analysis_depth '{depth}'. "
|
||||
f"Must be one of {self.VALID_DEPTH_LEVELS}"
|
||||
)
|
||||
|
||||
# Validate max_issues if specified
|
||||
if 'max_issues' in source and not isinstance(source['max_issues'], int):
|
||||
raise ValueError(f"Source {index} (github): 'max_issues' must be an integer")
|
||||
|
||||
def _validate_pdf_source(self, source: Dict[str, Any], index: int):
|
||||
"""Validate PDF source configuration."""
|
||||
if 'path' not in source:
|
||||
raise ValueError(f"Source {index} (pdf): Missing required field 'path'")
|
||||
|
||||
# Check if file exists
|
||||
pdf_path = source['path']
|
||||
if not Path(pdf_path).exists():
|
||||
logger.warning(f"Source {index} (pdf): File not found: {pdf_path}")
|
||||
|
||||
def _validate_legacy(self) -> bool:
|
||||
"""
|
||||
Validate legacy config format (backward compatibility).
|
||||
|
||||
Legacy configs are the old format used by doc_scraper, github_scraper, pdf_scraper.
|
||||
"""
|
||||
logger.info("Detected legacy config format (backward compatible)")
|
||||
|
||||
# Detect which legacy type based on fields
|
||||
if 'base_url' in self.config:
|
||||
logger.info("Legacy type: documentation")
|
||||
elif 'repo' in self.config:
|
||||
logger.info("Legacy type: github")
|
||||
elif 'pdf' in self.config or 'path' in self.config:
|
||||
logger.info("Legacy type: pdf")
|
||||
else:
|
||||
raise ValueError("Cannot detect legacy config type (missing base_url, repo, or pdf)")
|
||||
|
||||
return True
|
||||
|
||||
def convert_legacy_to_unified(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Convert legacy config to unified format.
|
||||
|
||||
Returns:
|
||||
Unified config dict
|
||||
"""
|
||||
if self.is_unified:
|
||||
logger.info("Config already in unified format")
|
||||
return self.config
|
||||
|
||||
logger.info("Converting legacy config to unified format...")
|
||||
|
||||
# Detect legacy type and convert
|
||||
if 'base_url' in self.config:
|
||||
return self._convert_legacy_documentation()
|
||||
elif 'repo' in self.config:
|
||||
return self._convert_legacy_github()
|
||||
elif 'pdf' in self.config or 'path' in self.config:
|
||||
return self._convert_legacy_pdf()
|
||||
else:
|
||||
raise ValueError("Cannot convert: unknown legacy format")
|
||||
|
||||
def _convert_legacy_documentation(self) -> Dict[str, Any]:
|
||||
"""Convert legacy documentation config to unified."""
|
||||
unified = {
|
||||
'name': self.config.get('name', 'unnamed'),
|
||||
'description': self.config.get('description', 'Documentation skill'),
|
||||
'merge_mode': 'rule-based',
|
||||
'sources': [
|
||||
{
|
||||
'type': 'documentation',
|
||||
**{k: v for k, v in self.config.items()
|
||||
if k not in ['name', 'description']}
|
||||
}
|
||||
]
|
||||
}
|
||||
return unified
|
||||
|
||||
def _convert_legacy_github(self) -> Dict[str, Any]:
|
||||
"""Convert legacy GitHub config to unified."""
|
||||
unified = {
|
||||
'name': self.config.get('name', 'unnamed'),
|
||||
'description': self.config.get('description', 'GitHub repository skill'),
|
||||
'merge_mode': 'rule-based',
|
||||
'sources': [
|
||||
{
|
||||
'type': 'github',
|
||||
**{k: v for k, v in self.config.items()
|
||||
if k not in ['name', 'description']}
|
||||
}
|
||||
]
|
||||
}
|
||||
return unified
|
||||
|
||||
def _convert_legacy_pdf(self) -> Dict[str, Any]:
|
||||
"""Convert legacy PDF config to unified."""
|
||||
unified = {
|
||||
'name': self.config.get('name', 'unnamed'),
|
||||
'description': self.config.get('description', 'PDF document skill'),
|
||||
'merge_mode': 'rule-based',
|
||||
'sources': [
|
||||
{
|
||||
'type': 'pdf',
|
||||
**{k: v for k, v in self.config.items()
|
||||
if k not in ['name', 'description']}
|
||||
}
|
||||
]
|
||||
}
|
||||
return unified
|
||||
|
||||
def get_sources_by_type(self, source_type: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get all sources of a specific type.
|
||||
|
||||
Args:
|
||||
source_type: 'documentation', 'github', or 'pdf'
|
||||
|
||||
Returns:
|
||||
List of sources matching the type
|
||||
"""
|
||||
if not self.is_unified:
|
||||
# For legacy, convert and get sources
|
||||
unified = self.convert_legacy_to_unified()
|
||||
sources = unified['sources']
|
||||
else:
|
||||
sources = self.config['sources']
|
||||
|
||||
return [s for s in sources if s.get('type') == source_type]
|
||||
|
||||
def has_multiple_sources(self) -> bool:
|
||||
"""Check if config has multiple sources (requires merging)."""
|
||||
if not self.is_unified:
|
||||
return False
|
||||
return len(self.config['sources']) > 1
|
||||
|
||||
def needs_api_merge(self) -> bool:
|
||||
"""
|
||||
Check if config needs API merging.
|
||||
|
||||
Returns True if both documentation and github sources exist
|
||||
with API extraction enabled.
|
||||
"""
|
||||
if not self.has_multiple_sources():
|
||||
return False
|
||||
|
||||
has_docs_api = any(
|
||||
s.get('type') == 'documentation' and s.get('extract_api', True)
|
||||
for s in self.config['sources']
|
||||
)
|
||||
|
||||
has_github_code = any(
|
||||
s.get('type') == 'github' and s.get('include_code', False)
|
||||
for s in self.config['sources']
|
||||
)
|
||||
|
||||
return has_docs_api and has_github_code
|
||||
|
||||
|
||||
def validate_config(config_path: str) -> ConfigValidator:
|
||||
"""
|
||||
Validate config file and return validator instance.
|
||||
|
||||
Args:
|
||||
config_path: Path to config JSON file
|
||||
|
||||
Returns:
|
||||
ConfigValidator instance
|
||||
|
||||
Raises:
|
||||
ValueError if config is invalid
|
||||
"""
|
||||
validator = ConfigValidator(config_path)
|
||||
validator.validate()
|
||||
return validator
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python config_validator.py <config.json>")
|
||||
sys.exit(1)
|
||||
|
||||
config_file = sys.argv[1]
|
||||
|
||||
try:
|
||||
validator = validate_config(config_file)
|
||||
|
||||
print(f"\n✅ Config valid!")
|
||||
print(f" Format: {'Unified' if validator.is_unified else 'Legacy'}")
|
||||
print(f" Name: {validator.config.get('name')}")
|
||||
|
||||
if validator.is_unified:
|
||||
sources = validator.config['sources']
|
||||
print(f" Sources: {len(sources)}")
|
||||
for i, source in enumerate(sources):
|
||||
print(f" {i+1}. {source['type']}")
|
||||
|
||||
if validator.needs_api_merge():
|
||||
merge_mode = validator.config.get('merge_mode', 'rule-based')
|
||||
print(f" ⚠️ API merge required (mode: {merge_mode})")
|
||||
|
||||
except ValueError as e:
|
||||
print(f"\n❌ Config invalid: {e}")
|
||||
sys.exit(1)
|
||||
@@ -31,6 +31,14 @@ except ImportError:
|
||||
print("Error: PyGithub not installed. Run: pip install PyGithub")
|
||||
sys.exit(1)
|
||||
|
||||
# Import code analyzer for deep code analysis
|
||||
try:
|
||||
from code_analyzer import CodeAnalyzer
|
||||
CODE_ANALYZER_AVAILABLE = True
|
||||
except ImportError:
|
||||
CODE_ANALYZER_AVAILABLE = False
|
||||
logger.warning("Code analyzer not available - deep analysis disabled")
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@@ -72,9 +80,16 @@ class GitHubScraper:
|
||||
self.max_issues = config.get('max_issues', 100)
|
||||
self.include_changelog = config.get('include_changelog', True)
|
||||
self.include_releases = config.get('include_releases', True)
|
||||
self.include_code = config.get('include_code', False) # Surface layer only
|
||||
self.include_code = config.get('include_code', False)
|
||||
self.code_analysis_depth = config.get('code_analysis_depth', 'surface') # 'surface', 'deep', 'full'
|
||||
self.file_patterns = config.get('file_patterns', [])
|
||||
|
||||
# Initialize code analyzer if deep analysis requested
|
||||
self.code_analyzer = None
|
||||
if self.code_analysis_depth != 'surface' and CODE_ANALYZER_AVAILABLE:
|
||||
self.code_analyzer = CodeAnalyzer(depth=self.code_analysis_depth)
|
||||
logger.info(f"Code analysis depth: {self.code_analysis_depth}")
|
||||
|
||||
# Output paths
|
||||
self.skill_dir = f"output/{self.name}"
|
||||
self.data_file = f"output/{self.name}_github_data.json"
|
||||
@@ -277,16 +292,107 @@ class GitHubScraper:
|
||||
def _extract_signatures_and_tests(self):
|
||||
"""
|
||||
C1.3, C1.5, C1.6: Extract signatures, docstrings, and test examples.
|
||||
Note: This is a simplified implementation - full extraction would require
|
||||
parsing each file, which is implemented in the surface layer approach.
|
||||
|
||||
Extraction depth depends on code_analysis_depth setting:
|
||||
- surface: File tree only (minimal)
|
||||
- deep: Parse files for signatures, parameters, types
|
||||
- full: Complete AST analysis (future enhancement)
|
||||
"""
|
||||
logger.info("Extracting code signatures and test examples...")
|
||||
if self.code_analysis_depth == 'surface':
|
||||
logger.info("Code extraction: Surface level (file tree only)")
|
||||
return
|
||||
|
||||
# This would be implemented by parsing specific files
|
||||
# For now, we note this as a placeholder for the surface layer
|
||||
# Real implementation would parse Python/JS/TS files for signatures
|
||||
if not self.code_analyzer:
|
||||
logger.warning("Code analyzer not available - skipping deep analysis")
|
||||
return
|
||||
|
||||
logger.info("Code extraction: Using surface layer (signatures only, no implementation)")
|
||||
logger.info(f"Extracting code signatures ({self.code_analysis_depth} analysis)...")
|
||||
|
||||
# Get primary language for the repository
|
||||
languages = self.extracted_data.get('languages', {})
|
||||
if not languages:
|
||||
logger.warning("No languages detected - skipping code analysis")
|
||||
return
|
||||
|
||||
# Determine primary language
|
||||
primary_language = max(languages.items(), key=lambda x: x[1]['bytes'])[0]
|
||||
logger.info(f"Primary language: {primary_language}")
|
||||
|
||||
# Determine file extensions to analyze
|
||||
extension_map = {
|
||||
'Python': ['.py'],
|
||||
'JavaScript': ['.js', '.jsx'],
|
||||
'TypeScript': ['.ts', '.tsx'],
|
||||
'C': ['.c', '.h'],
|
||||
'C++': ['.cpp', '.hpp', '.cc', '.hh', '.cxx']
|
||||
}
|
||||
|
||||
extensions = extension_map.get(primary_language, [])
|
||||
if not extensions:
|
||||
logger.warning(f"No file extensions mapped for {primary_language}")
|
||||
return
|
||||
|
||||
# Analyze files matching patterns and extensions
|
||||
analyzed_files = []
|
||||
file_tree = self.extracted_data.get('file_tree', [])
|
||||
|
||||
for file_info in file_tree:
|
||||
file_path = file_info['path']
|
||||
|
||||
# Check if file matches extension
|
||||
if not any(file_path.endswith(ext) for ext in extensions):
|
||||
continue
|
||||
|
||||
# Check if file matches patterns (if specified)
|
||||
if self.file_patterns:
|
||||
import fnmatch
|
||||
if not any(fnmatch.fnmatch(file_path, pattern) for pattern in self.file_patterns):
|
||||
continue
|
||||
|
||||
# Analyze this file
|
||||
try:
|
||||
file_content = self.repo.get_contents(file_path)
|
||||
content = file_content.decoded_content.decode('utf-8')
|
||||
|
||||
analysis_result = self.code_analyzer.analyze_file(
|
||||
file_path,
|
||||
content,
|
||||
primary_language
|
||||
)
|
||||
|
||||
if analysis_result and (analysis_result.get('classes') or analysis_result.get('functions')):
|
||||
analyzed_files.append({
|
||||
'file': file_path,
|
||||
'language': primary_language,
|
||||
**analysis_result
|
||||
})
|
||||
|
||||
logger.debug(f"Analyzed {file_path}: "
|
||||
f"{len(analysis_result.get('classes', []))} classes, "
|
||||
f"{len(analysis_result.get('functions', []))} functions")
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not analyze {file_path}: {e}")
|
||||
continue
|
||||
|
||||
# Limit number of files analyzed to avoid rate limits
|
||||
if len(analyzed_files) >= 50:
|
||||
logger.info(f"Reached analysis limit (50 files)")
|
||||
break
|
||||
|
||||
self.extracted_data['code_analysis'] = {
|
||||
'depth': self.code_analysis_depth,
|
||||
'language': primary_language,
|
||||
'files_analyzed': len(analyzed_files),
|
||||
'files': analyzed_files
|
||||
}
|
||||
|
||||
# Calculate totals
|
||||
total_classes = sum(len(f.get('classes', [])) for f in analyzed_files)
|
||||
total_functions = sum(len(f.get('functions', [])) for f in analyzed_files)
|
||||
|
||||
logger.info(f"Code analysis complete: {len(analyzed_files)} files, "
|
||||
f"{total_classes} classes, {total_functions} functions")
|
||||
|
||||
def _extract_issues(self):
|
||||
"""C1.7: Extract GitHub Issues (open/closed, labels, milestones)."""
|
||||
|
||||
50
configs/godot_unified.json
Normal file
50
configs/godot_unified.json
Normal file
@@ -0,0 +1,50 @@
|
||||
{
|
||||
"name": "godot",
|
||||
"description": "Complete Godot Engine knowledge base combining official documentation and source code analysis",
|
||||
"merge_mode": "claude-enhanced",
|
||||
"sources": [
|
||||
{
|
||||
"type": "documentation",
|
||||
"base_url": "https://docs.godotengine.org/en/stable/",
|
||||
"extract_api": true,
|
||||
"selectors": {
|
||||
"main_content": "div[role='main']",
|
||||
"title": "title",
|
||||
"code_blocks": "pre"
|
||||
},
|
||||
"url_patterns": {
|
||||
"include": [],
|
||||
"exclude": ["/search.html", "/_static/", "/_images/"]
|
||||
},
|
||||
"categories": {
|
||||
"getting_started": ["introduction", "getting_started", "step_by_step"],
|
||||
"scripting": ["scripting", "gdscript", "c_sharp"],
|
||||
"2d": ["2d", "canvas", "sprite", "animation"],
|
||||
"3d": ["3d", "spatial", "mesh", "shader"],
|
||||
"physics": ["physics", "collision", "rigidbody"],
|
||||
"api": ["api", "class", "reference", "method"]
|
||||
},
|
||||
"rate_limit": 0.5,
|
||||
"max_pages": 500
|
||||
},
|
||||
{
|
||||
"type": "github",
|
||||
"repo": "godotengine/godot",
|
||||
"github_token": null,
|
||||
"code_analysis_depth": "deep",
|
||||
"include_code": true,
|
||||
"include_issues": true,
|
||||
"max_issues": 100,
|
||||
"include_changelog": true,
|
||||
"include_releases": true,
|
||||
"file_patterns": [
|
||||
"core/**/*.h",
|
||||
"core/**/*.cpp",
|
||||
"scene/**/*.h",
|
||||
"scene/**/*.cpp",
|
||||
"servers/**/*.h",
|
||||
"servers/**/*.cpp"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
Reference in New Issue
Block a user