style: Run black formatter on 16 files

Applied black formatting to files modified in linting fixes:

Source files (8):
- config_extractor.py
- doc_scraper.py
- how_to_guide_builder.py
- llms_txt_downloader.py
- llms_txt_parser.py
- pattern_recognizer.py
- test_example_extractor.py
- unified_codebase_analyzer.py

Test files (8):
- test_architecture_scenarios.py
- test_async_scraping.py
- test_github_scraper.py
- test_guide_enhancer.py
- test_install_agent.py
- test_issue_219_e2e.py
- test_llms_txt_downloader.py
- test_skip_llms_txt.py

All formatting issues resolved.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
yusyus
2026-01-17 23:56:24 +03:00
parent 9666938eb0
commit 9d43956b1d
16 changed files with 1044 additions and 335 deletions

View File

@@ -65,7 +65,15 @@ class ConfigFile:
file_path: str
relative_path: str
config_type: Literal[
"json", "yaml", "toml", "env", "ini", "python", "javascript", "dockerfile", "docker-compose"
"json",
"yaml",
"toml",
"env",
"ini",
"python",
"javascript",
"dockerfile",
"docker-compose",
]
purpose: str # Inferred purpose: database, api, logging, etc.
settings: list[ConfigSetting] = field(default_factory=list)
@@ -81,7 +89,9 @@ class ConfigExtractionResult:
config_files: list[ConfigFile] = field(default_factory=list)
total_files: int = 0
total_settings: int = 0
detected_patterns: dict[str, list[str]] = field(default_factory=dict) # pattern -> files
detected_patterns: dict[str, list[str]] = field(
default_factory=dict
) # pattern -> files
errors: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
@@ -195,7 +205,12 @@ class ConfigFileDetector:
},
"javascript": {
"patterns": ["*.config.js", "*.config.ts"],
"names": ["config.js", "next.config.js", "vue.config.js", "webpack.config.js"],
"names": [
"config.js",
"next.config.js",
"vue.config.js",
"webpack.config.js",
],
},
"dockerfile": {
"patterns": ["Dockerfile*"],
@@ -226,7 +241,9 @@ class ConfigFileDetector:
"*.egg-info",
}
def find_config_files(self, directory: Path, max_files: int = 100) -> list[ConfigFile]:
def find_config_files(
self, directory: Path, max_files: int = 100
) -> list[ConfigFile]:
"""
Find all configuration files in directory.
@@ -297,7 +314,10 @@ class ConfigFileDetector:
filename = file_path.name.lower()
# Database configs
if any(word in path_lower for word in ["database", "db", "postgres", "mysql", "mongo"]):
if any(
word in path_lower
for word in ["database", "db", "postgres", "mysql", "mongo"]
):
return "database_configuration"
# API configs
@@ -313,7 +333,9 @@ class ConfigFileDetector:
return "docker_configuration"
# CI/CD configs
if any(word in path_lower for word in [".travis", ".gitlab", ".github", "ci", "cd"]):
if any(
word in path_lower for word in [".travis", ".gitlab", ".github", "ci", "cd"]
):
return "ci_cd_configuration"
# Package configs
@@ -325,7 +347,11 @@ class ConfigFileDetector:
return "typescript_configuration"
# Framework configs
if "next.config" in filename or "vue.config" in filename or "webpack.config" in filename:
if (
"next.config" in filename
or "vue.config" in filename
or "webpack.config" in filename
):
return "framework_configuration"
# Environment configs
@@ -467,7 +493,12 @@ class ConfigParser:
for node in ast.walk(tree):
# Get variable name and skip private variables
if isinstance(node, ast.Assign) and len(node.targets) == 1 and isinstance(node.targets[0], ast.Name) and not node.targets[0].id.startswith("_"):
if (
isinstance(node, ast.Assign)
and len(node.targets) == 1
and isinstance(node.targets[0], ast.Name)
and not node.targets[0].id.startswith("_")
):
key = node.targets[0].id
# Extract value
@@ -500,7 +531,9 @@ class ConfigParser:
for match in re.finditer(pattern, config_file.raw_content):
if len(match.groups()) >= 2:
key = match.group(1)
value = match.group(3) if len(match.groups()) > 2 else match.group(2)
value = (
match.group(3) if len(match.groups()) > 2 else match.group(2)
)
setting = ConfigSetting(
key=key, value=value, value_type=self._infer_type(value)
@@ -546,7 +579,9 @@ class ConfigParser:
for key, value in data.items():
if isinstance(value, dict):
# Recurse into nested dicts
self._extract_settings_from_dict(value, config_file, parent_path + [key])
self._extract_settings_from_dict(
value, config_file, parent_path + [key]
)
else:
setting = ConfigSetting(
key=".".join(parent_path + [key]) if parent_path else key,
@@ -593,11 +628,26 @@ class ConfigPatternDetector:
# Known configuration patterns
KNOWN_PATTERNS = {
"database_config": {
"keys": ["host", "port", "database", "user", "username", "password", "db_name"],
"keys": [
"host",
"port",
"database",
"user",
"username",
"password",
"db_name",
],
"min_match": 3,
},
"api_config": {
"keys": ["base_url", "api_key", "api_secret", "timeout", "retry", "endpoint"],
"keys": [
"base_url",
"api_key",
"api_secret",
"timeout",
"retry",
"endpoint",
],
"min_match": 2,
},
"logging_config": {
@@ -822,7 +872,9 @@ def main():
print("\n📊 Summary:")
print(f" Config files found: {result.total_files}")
print(f" Total settings: {result.total_settings}")
print(f" Detected patterns: {', '.join(result.detected_patterns.keys()) or 'None'}")
print(
f" Detected patterns: {', '.join(result.detected_patterns.keys()) or 'None'}"
)
if "ai_enhancements" in output_dict:
print(f" ✨ AI enhancements: Yes ({enhance_mode} mode)")

View File

@@ -148,7 +148,9 @@ def infer_description_from_docs(
class DocToSkillConverter:
def __init__(self, config: dict[str, Any], dry_run: bool = False, resume: bool = False) -> None:
def __init__(
self, config: dict[str, Any], dry_run: bool = False, resume: bool = False
) -> None:
self.config = config
self.name = config["name"]
self.base_url = config["base_url"]
@@ -163,7 +165,9 @@ class DocToSkillConverter:
# Checkpoint config
checkpoint_config = config.get("checkpoint", {})
self.checkpoint_enabled = checkpoint_config.get("enabled", False)
self.checkpoint_interval = checkpoint_config.get("interval", DEFAULT_CHECKPOINT_INTERVAL)
self.checkpoint_interval = checkpoint_config.get(
"interval", DEFAULT_CHECKPOINT_INTERVAL
)
# llms.txt detection state
skip_llms_txt_value = config.get("skip_llms_txt", False)
@@ -318,7 +322,9 @@ class DocToSkillConverter:
for h in main.find_all(["h1", "h2", "h3", "h4", "h5", "h6"]):
text = self.clean_text(h.get_text())
if text:
page["headings"].append({"level": h.name, "text": text, "id": h.get("id", "")})
page["headings"].append(
{"level": h.name, "text": text, "id": h.get("id", "")}
)
# Extract code with language detection
code_selector = selectors.get("code_blocks", "pre code")
@@ -385,7 +391,9 @@ class DocToSkillConverter:
import re
# Detect if content is actually HTML (some .md URLs return HTML)
if content.strip().startswith("<!DOCTYPE") or content.strip().startswith("<html"):
if content.strip().startswith("<!DOCTYPE") or content.strip().startswith(
"<html"
):
return self._extract_html_as_markdown(content, url)
page = {
@@ -413,14 +421,20 @@ class DocToSkillConverter:
level = len(match.group(1))
text = match.group(2).strip()
page["headings"].append(
{"level": f"h{level}", "text": text, "id": text.lower().replace(" ", "-")}
{
"level": f"h{level}",
"text": text,
"id": text.lower().replace(" ", "-"),
}
)
# Extract code blocks with language
code_blocks = re.findall(r"```(\w+)?\n(.*?)```", content, re.DOTALL)
for lang, code in code_blocks:
if len(code.strip()) > 10:
page["code_samples"].append({"code": code.strip(), "language": lang or "unknown"})
page["code_samples"].append(
{"code": code.strip(), "language": lang or "unknown"}
)
# Extract content (paragraphs)
content_no_code = re.sub(r"```.*?```", "", content, flags=re.DOTALL)
@@ -444,7 +458,11 @@ class DocToSkillConverter:
# Strip anchor fragments
full_url = full_url.split("#")[0]
# Only include .md URLs to avoid client-side rendered HTML pages
if ".md" in full_url and self.is_valid_url(full_url) and full_url not in page["links"]:
if (
".md" in full_url
and self.is_valid_url(full_url)
and full_url not in page["links"]
):
page["links"].append(full_url)
return page
@@ -508,14 +526,18 @@ class DocToSkillConverter:
for h in main.find_all(["h1", "h2", "h3", "h4", "h5", "h6"]):
text = self.clean_text(h.get_text())
if text:
page["headings"].append({"level": h.name, "text": text, "id": h.get("id", "")})
page["headings"].append(
{"level": h.name, "text": text, "id": h.get("id", "")}
)
# Extract code blocks
for code_elem in main.select("pre code, pre"):
code = code_elem.get_text()
if len(code.strip()) > 10:
lang = self.detect_language(code_elem, code)
page["code_samples"].append({"code": code.strip(), "language": lang})
page["code_samples"].append(
{"code": code.strip(), "language": lang}
)
# Extract paragraphs
paragraphs = []
@@ -536,7 +558,9 @@ class DocToSkillConverter:
# Log low-confidence detections for debugging
if confidence < 0.5:
logger.debug(f"Low confidence language detection: {lang} ({confidence:.2f})")
logger.debug(
f"Low confidence language detection: {lang} ({confidence:.2f})"
)
return lang # Return string for backward compatibility
@@ -549,7 +573,10 @@ class DocToSkillConverter:
# Look for "Example:" or "Pattern:" sections
for elem in main.find_all(["p", "div"]):
text = elem.get_text().lower()
if any(word in text for word in ["example:", "pattern:", "usage:", "typical use"]):
if any(
word in text
for word in ["example:", "pattern:", "usage:", "typical use"]
):
# Get the code that follows
next_code = elem.find_next(["pre", "code"])
if next_code:
@@ -571,7 +598,9 @@ class DocToSkillConverter:
"""Save page data (skip pages with empty content)"""
# Skip pages with empty or very short content
if not page.get("content") or len(page.get("content", "")) < 50:
logger.debug("Skipping page with empty/short content: %s", page.get("url", "unknown"))
logger.debug(
"Skipping page with empty/short content: %s", page.get("url", "unknown")
)
return
url_hash = hashlib.md5(page["url"].encode()).hexdigest()[:10]
@@ -619,7 +648,10 @@ class DocToSkillConverter:
# Add new URLs
for link in page["links"]:
if link not in self.visited_urls and link not in self.pending_urls:
if (
link not in self.visited_urls
and link not in self.pending_urls
):
self.pending_urls.append(link)
else:
# Single-threaded mode (no lock needed)
@@ -640,7 +672,9 @@ class DocToSkillConverter:
except Exception as e:
if self.workers > 1:
with self.lock:
logger.error(" ✗ Error scraping %s: %s: %s", url, type(e).__name__, e)
logger.error(
" ✗ Error scraping %s: %s: %s", url, type(e).__name__, e
)
else:
logger.error(" ✗ Error scraping page: %s: %s", type(e).__name__, e)
logger.error(" URL: %s", url)
@@ -715,7 +749,8 @@ class DocToSkillConverter:
md_urls.append(md_url)
logger.info(
" ✓ Converted %d URLs to .md format (will validate during crawl)", len(md_urls)
" ✓ Converted %d URLs to .md format (will validate during crawl)",
len(md_urls),
)
return md_urls
@@ -757,7 +792,9 @@ class DocToSkillConverter:
# Check for explicit config URL first
explicit_url = self.config.get("llms_txt_url")
if explicit_url:
logger.info("\n📌 Using explicit llms_txt_url from config: %s", explicit_url)
logger.info(
"\n📌 Using explicit llms_txt_url from config: %s", explicit_url
)
# Download explicit file first
downloader = LlmsTxtDownloader(explicit_url)
@@ -779,7 +816,8 @@ class DocToSkillConverter:
if variants:
logger.info(
"\n🔍 Found %d total variant(s), downloading remaining...", len(variants)
"\n🔍 Found %d total variant(s), downloading remaining...",
len(variants),
)
for variant_info in variants:
url = variant_info["url"]
@@ -800,7 +838,11 @@ class DocToSkillConverter:
)
with open(extra_filepath, "w", encoding="utf-8") as f:
f.write(extra_content)
logger.info("%s (%d chars)", extra_filename, len(extra_content))
logger.info(
"%s (%d chars)",
extra_filename,
len(extra_content),
)
# Parse explicit file for skill building
parser = LlmsTxtParser(content, self.base_url)
@@ -822,7 +864,8 @@ class DocToSkillConverter:
self.pending_urls.append(url)
logger.info(
" 📋 %d URLs added to crawl queue after filtering", len(self.pending_urls)
" 📋 %d URLs added to crawl queue after filtering",
len(self.pending_urls),
)
# Return False to trigger HTML scraping with the populated pending_urls
@@ -872,7 +915,9 @@ class DocToSkillConverter:
logger.info("%s (%d chars)", filename, len(content))
if not downloaded:
logger.warning("⚠️ Failed to download any variants, falling back to HTML scraping")
logger.warning(
"⚠️ Failed to download any variants, falling back to HTML scraping"
)
return False
# Save ALL variants to references/
@@ -906,7 +951,10 @@ class DocToSkillConverter:
if self.is_valid_url(url) and url not in self.visited_urls:
self.pending_urls.append(url)
logger.info(" 📋 %d URLs added to crawl queue after filtering", len(self.pending_urls))
logger.info(
" 📋 %d URLs added to crawl queue after filtering",
len(self.pending_urls),
)
# Return False to trigger HTML scraping with the populated pending_urls
self.llms_txt_detected = True
@@ -947,7 +995,8 @@ class DocToSkillConverter:
llms_result = self._try_llms_txt()
if llms_result:
logger.info(
"\n✅ Used llms.txt (%s) - skipping HTML scraping", self.llms_txt_variant
"\n✅ Used llms.txt (%s) - skipping HTML scraping",
self.llms_txt_variant,
)
self.save_summary()
return
@@ -983,7 +1032,9 @@ class DocToSkillConverter:
# Single-threaded mode (original sequential logic)
if self.workers <= 1:
while self.pending_urls and (unlimited or len(self.visited_urls) < preview_limit):
while self.pending_urls and (
unlimited or len(self.visited_urls) < preview_limit
):
url = self.pending_urls.popleft()
if url in self.visited_urls:
@@ -995,7 +1046,9 @@ class DocToSkillConverter:
# Just show what would be scraped
logger.info(" [Preview] %s", url)
try:
headers = {"User-Agent": "Mozilla/5.0 (Documentation Scraper - Dry Run)"}
headers = {
"User-Agent": "Mozilla/5.0 (Documentation Scraper - Dry Run)"
}
response = requests.get(url, headers=headers, timeout=10)
soup = BeautifulSoup(response.content, "html.parser")
@@ -1007,11 +1060,16 @@ class DocToSkillConverter:
if main:
for link in main.find_all("a", href=True):
href = urljoin(url, link["href"])
if self.is_valid_url(href) and href not in self.visited_urls:
if (
self.is_valid_url(href)
and href not in self.visited_urls
):
self.pending_urls.append(href)
except Exception as e:
# Failed to extract links in fast mode, continue anyway
logger.warning("⚠️ Warning: Could not extract links from %s: %s", url, e)
logger.warning(
"⚠️ Warning: Could not extract links from %s: %s", url, e
)
else:
self.scrape_page(url)
self.pages_scraped += 1
@@ -1034,7 +1092,9 @@ class DocToSkillConverter:
with ThreadPoolExecutor(max_workers=self.workers) as executor:
futures = []
while self.pending_urls and (unlimited or len(self.visited_urls) < preview_limit):
while self.pending_urls and (
unlimited or len(self.visited_urls) < preview_limit
):
# Get next batch of URLs (thread-safe)
batch = []
batch_size = min(self.workers * 2, len(self.pending_urls))
@@ -1092,9 +1152,14 @@ class DocToSkillConverter:
self.pages_scraped += 1
if self.dry_run:
logger.info("\n✅ Dry run complete: would scrape ~%d pages", len(self.visited_urls))
logger.info(
"\n✅ Dry run complete: would scrape ~%d pages", len(self.visited_urls)
)
if len(self.visited_urls) >= preview_limit:
logger.info(" (showing first %d, actual scraping may find more)", preview_limit)
logger.info(
" (showing first %d, actual scraping may find more)",
preview_limit,
)
logger.info("\n💡 To actually scrape, run without --dry-run")
else:
logger.info("\n✅ Scraped %d pages", len(self.visited_urls))
@@ -1114,7 +1179,8 @@ class DocToSkillConverter:
llms_result = self._try_llms_txt()
if llms_result:
logger.info(
"\n✅ Used llms.txt (%s) - skipping HTML scraping", self.llms_txt_variant
"\n✅ Used llms.txt (%s) - skipping HTML scraping",
self.llms_txt_variant,
)
self.save_summary()
return
@@ -1155,7 +1221,9 @@ class DocToSkillConverter:
) as client:
tasks = []
while self.pending_urls and (unlimited or len(self.visited_urls) < preview_limit):
while self.pending_urls and (
unlimited or len(self.visited_urls) < preview_limit
):
# Get next batch of URLs
batch = []
batch_size = min(self.workers * 2, len(self.pending_urls))
@@ -1191,7 +1259,11 @@ class DocToSkillConverter:
logger.info(" [%d pages scraped]", self.pages_scraped)
# Checkpoint saving
if not self.dry_run and self.checkpoint_enabled and self.pages_scraped % self.checkpoint_interval == 0:
if (
not self.dry_run
and self.checkpoint_enabled
and self.pages_scraped % self.checkpoint_interval == 0
):
self.save_checkpoint()
# Wait for any remaining tasks
@@ -1199,10 +1271,13 @@ class DocToSkillConverter:
await asyncio.gather(*tasks, return_exceptions=True)
if self.dry_run:
logger.info("\n✅ Dry run complete: would scrape ~%d pages", len(self.visited_urls))
logger.info(
"\n✅ Dry run complete: would scrape ~%d pages", len(self.visited_urls)
)
if len(self.visited_urls) >= preview_limit:
logger.info(
" (showing first %d, actual scraping may find more)", int(preview_limit)
" (showing first %d, actual scraping may find more)",
int(preview_limit),
)
logger.info("\n💡 To actually scrape, run without --dry-run")
else:
@@ -1237,7 +1312,10 @@ class DocToSkillConverter:
pages.append(json.load(f))
except Exception as e:
logger.error(
"⚠️ Error loading scraped data file %s: %s: %s", json_file, type(e).__name__, e
"⚠️ Error loading scraped data file %s: %s: %s",
json_file,
type(e).__name__,
e,
)
logger.error(
" Suggestion: File may be corrupted, consider re-scraping with --fresh"
@@ -1245,7 +1323,9 @@ class DocToSkillConverter:
return pages
def smart_categorize(self, pages: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
def smart_categorize(
self, pages: list[dict[str, Any]]
) -> dict[str, list[dict[str, Any]]]:
"""Improved categorization with better pattern matching"""
category_defs = self.config.get("categories", {})
@@ -1297,14 +1377,18 @@ class DocToSkillConverter:
for page in pages:
path = urlparse(page["url"]).path
segments = [
s for s in path.split("/") if s and s not in ["en", "stable", "latest", "docs"]
s
for s in path.split("/")
if s and s not in ["en", "stable", "latest", "docs"]
]
for seg in segments:
url_segments[seg] += 1
# Top segments become categories
top_segments = sorted(url_segments.items(), key=lambda x: x[1], reverse=True)[:8]
top_segments = sorted(url_segments.items(), key=lambda x: x[1], reverse=True)[
:8
]
categories = {}
for seg, count in top_segments:
@@ -1324,7 +1408,9 @@ class DocToSkillConverter:
return categories
def generate_quick_reference(self, pages: list[dict[str, Any]]) -> list[dict[str, str]]:
def generate_quick_reference(
self, pages: list[dict[str, Any]]
) -> list[dict[str, str]]:
"""Generate quick reference from common patterns (NEW FEATURE)"""
quick_ref = []
@@ -1393,7 +1479,9 @@ class DocToSkillConverter:
logger.info("%s.md (%d pages)", category, len(pages))
def create_enhanced_skill_md(
self, categories: dict[str, list[dict[str, Any]]], quick_ref: list[dict[str, str]]
self,
categories: dict[str, list[dict[str, Any]]],
quick_ref: list[dict[str, str]],
) -> None:
"""Create SKILL.md with actual examples (IMPROVED)"""
# Try to infer description if not in config
@@ -1404,7 +1492,9 @@ class DocToSkillConverter:
if pages:
first_page_html = pages[0].get("raw_html", "")
break
description = infer_description_from_docs(self.base_url, first_page_html, self.name)
description = infer_description_from_docs(
self.base_url, first_page_html, self.name
)
else:
description = self.config["description"]
@@ -1412,7 +1502,9 @@ class DocToSkillConverter:
example_codes = []
for pages in categories.values():
for page in pages[:3]: # First 3 pages per category
for sample in page.get("code_samples", [])[:2]: # First 2 samples per page
for sample in page.get("code_samples", [])[
:2
]: # First 2 samples per page
code = sample.get("code", sample if isinstance(sample, str) else "")
lang = sample.get("language", "unknown")
if len(code) < 200 and lang != "unknown":
@@ -1462,7 +1554,9 @@ This skill should be triggered when:
content += pattern.get("code", "")[:300]
content += "\n```\n\n"
else:
content += "*Quick reference patterns will be added as you use the skill.*\n\n"
content += (
"*Quick reference patterns will be added as you use the skill.*\n\n"
)
# Add example codes from docs
if example_codes:
@@ -1477,7 +1571,9 @@ This skill includes comprehensive documentation in `references/`:
"""
for cat in sorted(categories.keys()):
content += f"- **{cat}.md** - {cat.replace('_', ' ').title()} documentation\n"
content += (
f"- **{cat}.md** - {cat.replace('_', ' ').title()} documentation\n"
)
content += """
Use `view` to read specific reference files when detailed information is needed.
@@ -1625,7 +1721,9 @@ def validate_config(config: dict[str, Any]) -> tuple[list[str], list[str]]:
)
# Validate base_url
if "base_url" in config and not config["base_url"].startswith(("http://", "https://")):
if "base_url" in config and not config["base_url"].startswith(
("http://", "https://")
):
errors.append(
f"Invalid base_url: '{config['base_url']}' (must start with http:// or https://)"
)
@@ -1648,7 +1746,9 @@ def validate_config(config: dict[str, Any]) -> tuple[list[str], list[str]]:
errors.append("'url_patterns' must be a dictionary")
else:
for key in ["include", "exclude"]:
if key in config["url_patterns"] and not isinstance(config["url_patterns"][key], list):
if key in config["url_patterns"] and not isinstance(
config["url_patterns"][key], list
):
errors.append(f"'url_patterns.{key}' must be a list")
# Validate categories
@@ -1740,12 +1840,18 @@ def load_config(config_path: str) -> dict[str, Any]:
except json.JSONDecodeError as e:
logger.error("❌ Error: Invalid JSON in config file: %s", config_path)
logger.error(" Details: %s", e)
logger.error(" Suggestion: Check syntax at line %d, column %d", e.lineno, e.colno)
logger.error(
" Suggestion: Check syntax at line %d, column %d", e.lineno, e.colno
)
sys.exit(1)
except FileNotFoundError:
logger.error("❌ Error: Config file not found: %s", config_path)
logger.error(" Suggestion: Create a config file or use an existing one from configs/")
logger.error(" Available configs: react.json, vue.json, django.json, godot.json")
logger.error(
" Suggestion: Create a config file or use an existing one from configs/"
)
logger.error(
" Available configs: react.json, vue.json, django.json, godot.json"
)
sys.exit(1)
# Validate config
@@ -1763,7 +1869,9 @@ def load_config(config_path: str) -> dict[str, Any]:
logger.error("❌ Configuration validation errors in %s:", config_path)
for error in errors:
logger.error(" - %s", error)
logger.error("\n Suggestion: Fix the above errors or check configs/ for working examples")
logger.error(
"\n Suggestion: Fix the above errors or check configs/ for working examples"
)
sys.exit(1)
return config
@@ -1870,10 +1978,16 @@ def setup_argument_parser() -> argparse.ArgumentParser:
)
parser.add_argument(
"--interactive", "-i", action="store_true", help="Interactive configuration mode"
"--interactive",
"-i",
action="store_true",
help="Interactive configuration mode",
)
parser.add_argument(
"--config", "-c", type=str, help="Load configuration from file (e.g., configs/godot.json)"
"--config",
"-c",
type=str,
help="Load configuration from file (e.g., configs/godot.json)",
)
parser.add_argument("--name", type=str, help="Skill name")
parser.add_argument("--url", type=str, help="Base documentation URL")
@@ -1902,14 +2016,18 @@ def setup_argument_parser() -> argparse.ArgumentParser:
help="Open terminal window for enhancement (use with --enhance-local)",
)
parser.add_argument(
"--api-key", type=str, help="Anthropic API key for --enhance (or set ANTHROPIC_API_KEY)"
"--api-key",
type=str,
help="Anthropic API key for --enhance (or set ANTHROPIC_API_KEY)",
)
parser.add_argument(
"--resume",
action="store_true",
help="Resume from last checkpoint (for interrupted scrapes)",
)
parser.add_argument("--fresh", action="store_true", help="Clear checkpoint and start fresh")
parser.add_argument(
"--fresh", action="store_true", help="Clear checkpoint and start fresh"
)
parser.add_argument(
"--rate-limit",
"-r",
@@ -1936,10 +2054,16 @@ def setup_argument_parser() -> argparse.ArgumentParser:
help="Disable rate limiting completely (same as --rate-limit 0)",
)
parser.add_argument(
"--verbose", "-v", action="store_true", help="Enable verbose output (DEBUG level logging)"
"--verbose",
"-v",
action="store_true",
help="Enable verbose output (DEBUG level logging)",
)
parser.add_argument(
"--quiet", "-q", action="store_true", help="Minimize output (WARNING level logging only)"
"--quiet",
"-q",
action="store_true",
help="Minimize output (WARNING level logging only)",
)
return parser
@@ -2002,11 +2126,15 @@ def get_configuration(args: argparse.Namespace) -> dict[str, Any]:
if args.workers:
# Validate workers count
if args.workers < 1:
logger.error("❌ Error: --workers must be at least 1 (got %d)", args.workers)
logger.error(
"❌ Error: --workers must be at least 1 (got %d)", args.workers
)
logger.error(" Suggestion: Use --workers 1 (default) or omit the flag")
sys.exit(1)
if args.workers > 10:
logger.warning("⚠️ Warning: --workers capped at 10 (requested %d)", args.workers)
logger.warning(
"⚠️ Warning: --workers capped at 10 (requested %d)", args.workers
)
args.workers = 10
config["workers"] = args.workers
if args.workers > 1:
@@ -2160,7 +2288,11 @@ def execute_enhancement(config: dict[str, Any], args: argparse.Namespace) -> Non
logger.info("=" * 60 + "\n")
try:
enhance_cmd = ["python3", "cli/enhance_skill.py", f"output/{config['name']}/"]
enhance_cmd = [
"python3",
"cli/enhance_skill.py",
f"output/{config['name']}/",
]
if args.api_key:
enhance_cmd.extend(["--api-key", args.api_key])
@@ -2204,9 +2336,14 @@ def execute_enhancement(config: dict[str, Any], args: argparse.Namespace) -> Non
# Suggest enhancement if not done
if not args.enhance and not args.enhance_local:
logger.info("\n💡 Optional: Enhance SKILL.md with Claude:")
logger.info(" Local (recommended): skill-seekers-enhance output/%s/", config["name"])
logger.info(
" Local (recommended): skill-seekers-enhance output/%s/", config["name"]
)
logger.info(" or re-run with: --enhance-local")
logger.info(" API-based: skill-seekers-enhance-api output/%s/", config["name"])
logger.info(
" API-based: skill-seekers-enhance-api output/%s/",
config["name"],
)
logger.info(" or re-run with: --enhance")
logger.info(
"\n💡 Tip: Use --interactive-enhancement with --enhance-local to open terminal window"

View File

@@ -79,7 +79,9 @@ class WorkflowStep:
setup_required: str | None = None
explanation: str | None = None # Why this step matters
common_pitfall: str | None = None # Warning for this step
common_variations: list[str] = field(default_factory=list) # AI: Alternative approaches
common_variations: list[str] = field(
default_factory=list
) # AI: Alternative approaches
@dataclass
@@ -221,7 +223,9 @@ class WorkflowAnalyzer:
# Check if next statement is assertion (verification)
idx = statements.index(stmt)
verification = None
if idx + 1 < len(statements) and isinstance(statements[idx + 1], ast.Assert):
if idx + 1 < len(statements) and isinstance(
statements[idx + 1], ast.Assert
):
verification = ast.get_source_segment(code, statements[idx + 1])
steps.append(
@@ -240,7 +244,9 @@ class WorkflowAnalyzer:
return steps
def _extract_steps_heuristic(self, code: str, _workflow: dict) -> list[WorkflowStep]:
def _extract_steps_heuristic(
self, code: str, _workflow: dict
) -> list[WorkflowStep]:
"""Extract steps using heuristics (for non-Python or invalid syntax)"""
steps = []
lines = code.split("\n")
@@ -259,7 +265,11 @@ class WorkflowAnalyzer:
description = self._infer_description_from_code(step_code)
steps.append(
WorkflowStep(step_number=step_num, code=step_code, description=description)
WorkflowStep(
step_number=step_num,
code=step_code,
description=description,
)
)
step_num += 1
current_step = []
@@ -272,7 +282,9 @@ class WorkflowAnalyzer:
step_code = "\n".join(current_step)
description = self._infer_description_from_code(step_code)
steps.append(
WorkflowStep(step_number=step_num, code=step_code, description=description)
WorkflowStep(
step_number=step_num, code=step_code, description=description
)
)
return steps
@@ -336,7 +348,11 @@ class WorkflowAnalyzer:
def _detect_prerequisites(self, workflow: dict) -> dict:
"""Detect prerequisites from workflow"""
metadata = {"prerequisites": [], "required_imports": [], "required_fixtures": []}
metadata = {
"prerequisites": [],
"required_imports": [],
"required_fixtures": [],
}
# Get dependencies from workflow
dependencies = workflow.get("dependencies", [])
@@ -438,7 +454,9 @@ class WorkflowGrouper:
groups = self._group_by_file_path(workflows)
return groups
def _group_by_ai_tutorial_group(self, workflows: list[dict]) -> dict[str, list[dict]]:
def _group_by_ai_tutorial_group(
self, workflows: list[dict]
) -> dict[str, list[dict]]:
"""Group by AI-generated tutorial_group (from C3.6 enhancement)"""
groups = defaultdict(list)
ungrouped = []
@@ -866,7 +884,10 @@ class HowToGuideBuilder:
if not workflows:
logger.warning("No workflow examples found!")
return GuideCollection(
total_guides=0, guides_by_complexity={}, guides_by_use_case={}, guides=[]
total_guides=0,
guides_by_complexity={},
guides_by_use_case={},
guides=[],
)
# Group workflows
@@ -893,7 +914,9 @@ class HowToGuideBuilder:
"""Filter to workflow category only"""
return [ex for ex in examples if ex.get("category") == "workflow"]
def _create_guide(self, title: str, workflows: list[dict], enhancer=None) -> HowToGuide:
def _create_guide(
self, title: str, workflows: list[dict], enhancer=None
) -> HowToGuide:
"""
Generate single guide from workflow(s).
@@ -928,7 +951,8 @@ class HowToGuideBuilder:
# Extract source files
source_files = [w.get("file_path", "") for w in workflows]
source_files = [
f"{Path(f).name}:{w.get('line_start', 0)}" for f, w in zip(source_files, workflows, strict=False)
f"{Path(f).name}:{w.get('line_start', 0)}"
for f, w in zip(source_files, workflows, strict=False)
]
# Create guide
@@ -950,14 +974,18 @@ class HowToGuideBuilder:
# Add AI enhancements if enhancer is available
if enhancer:
self._enhance_guide_with_ai(guide, primary_workflow.get("ai_analysis", {}), enhancer)
self._enhance_guide_with_ai(
guide, primary_workflow.get("ai_analysis", {}), enhancer
)
elif self.enhance_with_ai and primary_workflow.get("ai_analysis"):
# Fallback to old enhancement method (basic)
self._enhance_guide_with_ai_basic(guide, primary_workflow["ai_analysis"])
return guide
def _generate_overview(self, primary_workflow: dict, _all_workflows: list[dict]) -> str:
def _generate_overview(
self, primary_workflow: dict, _all_workflows: list[dict]
) -> str:
"""Generate guide overview"""
# Try to get explanation from AI analysis
if primary_workflow.get("ai_analysis"):
@@ -991,7 +1019,10 @@ class HowToGuideBuilder:
# Prepare guide data for enhancer
guide_data = {
"title": guide.title,
"steps": [{"description": step.description, "code": step.code} for step in guide.steps],
"steps": [
{"description": step.description, "code": step.code}
for step in guide.steps
],
"language": "python", # TODO: Detect from code
"prerequisites": guide.prerequisites,
"description": guide.overview,
@@ -1024,7 +1055,9 @@ class HowToGuideBuilder:
if "use_cases" in enhanced_data:
guide.use_cases = enhanced_data["use_cases"]
logger.info(f"✨ Enhanced guide '{guide.title}' with comprehensive AI improvements")
logger.info(
f"✨ Enhanced guide '{guide.title}' with comprehensive AI improvements"
)
def _enhance_guide_with_ai_basic(self, guide: HowToGuide, ai_analysis: dict):
"""
@@ -1089,7 +1122,9 @@ class HowToGuideBuilder:
for guide in guides:
# Generate filename from title
filename = guide.title.lower().replace(" ", "-").replace(":", "") + ".md"
filename = (
guide.title.lower().replace(" ", "-").replace(":", "") + ".md"
)
file_path = use_case_dir / filename
# Generate and save markdown
@@ -1100,7 +1135,9 @@ class HowToGuideBuilder:
index_markdown = self.generator.generate_index(collection.guides)
(output_dir / "index.md").write_text(index_markdown, encoding="utf-8")
logger.info(f"✅ Saved {collection.total_guides} guides + index to {output_dir}")
logger.info(
f"✅ Saved {collection.total_guides} guides + index to {output_dir}"
)
# ============================================================================
@@ -1142,11 +1179,15 @@ Grouping Strategies:
)
parser.add_argument(
"input", nargs="?", help="Input: directory with test files OR test_examples.json file"
"input",
nargs="?",
help="Input: directory with test files OR test_examples.json file",
)
parser.add_argument(
"--input", dest="input_file", help="Input JSON file with test examples (from C3.2)"
"--input",
dest="input_file",
help="Input JSON file with test examples (from C3.2)",
)
parser.add_argument(
@@ -1165,7 +1206,9 @@ Grouping Strategies:
parser.add_argument("--no-ai", action="store_true", help="Disable AI enhancement")
parser.add_argument(
"--json-output", action="store_true", help="Output JSON summary instead of markdown files"
"--json-output",
action="store_true",
help="Output JSON summary instead of markdown files",
)
args = parser.parse_args()
@@ -1201,7 +1244,9 @@ Grouping Strategies:
# Extract from directory using test example extractor
print("⚠️ Directory input requires test example extractor")
print(" Please use test_examples.json output from C3.2")
print(f" Or run: skill-seekers extract-test-examples {input_path} --json > examples.json")
print(
f" Or run: skill-seekers extract-test-examples {input_path} --json > examples.json"
)
sys.exit(1)
else:

View File

@@ -1,6 +1,5 @@
"""ABOUTME: Downloads llms.txt files from documentation URLs with retry logic"""
import time
import requests

View File

@@ -1,6 +1,5 @@
"""ABOUTME: Parses llms.txt markdown content into structured page data"""
import re
from urllib.parse import urljoin
@@ -128,7 +127,9 @@ class LlmsTxtParser:
# Extract code blocks
code_blocks = re.findall(r"```(\w+)?\n(.*?)```", content, re.DOTALL)
for lang, code in code_blocks:
page["code_samples"].append({"code": code.strip(), "language": lang or "unknown"})
page["code_samples"].append(
{"code": code.strip(), "language": lang or "unknown"}
)
# Extract h2/h3 headings
headings = re.findall(r"^(#{2,3})\s+(.+)$", content, re.MULTILINE)
@@ -145,7 +146,9 @@ class LlmsTxtParser:
content_no_code = re.sub(r"```.*?```", "", content, flags=re.DOTALL)
# Extract paragraphs
paragraphs = [p.strip() for p in content_no_code.split("\n\n") if len(p.strip()) > 20]
paragraphs = [
p.strip() for p in content_no_code.split("\n\n") if len(p.strip()) > 20
]
page["content"] = "\n\n".join(paragraphs)
return page

View File

@@ -237,7 +237,9 @@ class PatternRecognizer:
self.detectors.append(TemplateMethodDetector(self.depth))
self.detectors.append(ChainOfResponsibilityDetector(self.depth))
def analyze_file(self, file_path: str, content: str, language: str) -> PatternReport:
def analyze_file(
self, file_path: str, content: str, language: str
) -> PatternReport:
"""
Analyze a single file for design patterns.
@@ -428,7 +430,9 @@ class SingletonDetector(BasePatternDetector):
# Python: __init__ or __new__
# Java/C#: private constructor (detected by naming)
# Check if it has logic (not just pass)
if method.name in ["__new__", "__init__", "constructor"] and (method.docstring or len(method.parameters) > 1):
if method.name in ["__new__", "__init__", "constructor"] and (
method.docstring or len(method.parameters) > 1
):
evidence.append(f"Controlled initialization: {method.name}")
confidence += 0.3
has_init_control = True
@@ -535,17 +539,19 @@ class FactoryDetector(BasePatternDetector):
for method in class_sig.methods:
method_lower = method.name.lower()
# Check if method returns something (has return type or is not void)
if any(name in method_lower for name in factory_method_names) and (method.return_type or "create" in method_lower):
if any(name in method_lower for name in factory_method_names) and (
method.return_type or "create" in method_lower
):
return PatternInstance(
pattern_type=self.pattern_type,
category=self.category,
confidence=0.6,
location="",
class_name=class_sig.name,
method_name=method.name,
line_number=method.line_number,
evidence=[f"Factory method detected: {method.name}"],
)
pattern_type=self.pattern_type,
category=self.category,
confidence=0.6,
location="",
class_name=class_sig.name,
method_name=method.name,
line_number=method.line_number,
evidence=[f"Factory method detected: {method.name}"],
)
return None
@@ -575,7 +581,9 @@ class FactoryDetector(BasePatternDetector):
# Check if multiple factory methods exist (Abstract Factory pattern)
if len(factory_methods) >= 2:
evidence.append(f"Multiple factory methods: {', '.join(factory_methods[:3])}")
evidence.append(
f"Multiple factory methods: {', '.join(factory_methods[:3])}"
)
confidence += 0.2
# Check for inheritance (factory hierarchy)
@@ -682,7 +690,13 @@ class ObserverDetector(BasePatternDetector):
has_notify = False
attach_names = ["attach", "add", "subscribe", "register", "addeventlistener"]
detach_names = ["detach", "remove", "unsubscribe", "unregister", "removeeventlistener"]
detach_names = [
"detach",
"remove",
"unsubscribe",
"unregister",
"removeeventlistener",
]
notify_names = ["notify", "update", "emit", "publish", "fire", "trigger"]
for method in class_sig.methods:
@@ -786,25 +800,35 @@ class StrategyDetector(BasePatternDetector):
]
if siblings:
evidence.append(f"Part of strategy family with: {', '.join(siblings[:3])}")
evidence.append(
f"Part of strategy family with: {', '.join(siblings[:3])}"
)
confidence += 0.5
if base_class and ("strategy" in base_class.lower() or "policy" in base_class.lower()):
if base_class and (
"strategy" in base_class.lower() or "policy" in base_class.lower()
):
evidence.append(f"Inherits from strategy base: {base_class}")
confidence += 0.3
# Check if this is a strategy base class
# (has subclasses in same file)
subclasses = [cls.name for cls in all_classes if class_sig.name in cls.base_classes]
subclasses = [
cls.name for cls in all_classes if class_sig.name in cls.base_classes
]
if len(subclasses) >= 2:
evidence.append(f"Strategy base with implementations: {', '.join(subclasses[:3])}")
evidence.append(
f"Strategy base with implementations: {', '.join(subclasses[:3])}"
)
confidence += 0.6
# Check for single dominant method (strategy interface)
if len(class_sig.methods) == 1 or len(class_sig.methods) == 2:
# Single method or method + __init__
main_method = [m for m in class_sig.methods if m.name not in ["__init__", "__new__"]]
main_method = [
m for m in class_sig.methods if m.name not in ["__init__", "__new__"]
]
if main_method:
evidence.append(f"Strategy interface method: {main_method[0].name}")
confidence += 0.2
@@ -1274,7 +1298,9 @@ class TemplateMethodDetector(BasePatternDetector):
class_lower = class_sig.name.lower()
if any(keyword in class_lower for keyword in template_keywords):
# Check if has subclasses
subclasses = [cls.name for cls in all_classes if class_sig.name in cls.base_classes]
subclasses = [
cls.name for cls in all_classes if class_sig.name in cls.base_classes
]
if subclasses:
return PatternInstance(
@@ -1284,7 +1310,9 @@ class TemplateMethodDetector(BasePatternDetector):
location="",
class_name=class_sig.name,
line_number=class_sig.line_number,
evidence=[f"Abstract base with subclasses: {', '.join(subclasses[:2])}"],
evidence=[
f"Abstract base with subclasses: {', '.join(subclasses[:2])}"
],
related_classes=subclasses,
)
@@ -1301,7 +1329,9 @@ class TemplateMethodDetector(BasePatternDetector):
# 3. Has template method that orchestrates
# Check for subclasses
subclasses = [cls.name for cls in all_classes if class_sig.name in cls.base_classes]
subclasses = [
cls.name for cls in all_classes if class_sig.name in cls.base_classes
]
if len(subclasses) >= 1:
evidence.append(f"Base class with {len(subclasses)} implementations")
@@ -1437,7 +1467,8 @@ class ChainOfResponsibilityDetector(BasePatternDetector):
# Check for set_next() method
has_set_next = any(
"next" in m.name.lower() and ("set" in m.name.lower() or "add" in m.name.lower())
"next" in m.name.lower()
and ("set" in m.name.lower() or "add" in m.name.lower())
for m in class_sig.methods
)
@@ -1458,7 +1489,9 @@ class ChainOfResponsibilityDetector(BasePatternDetector):
]
if siblings and has_next_ref:
evidence.append(f"Part of handler chain with: {', '.join(siblings[:2])}")
evidence.append(
f"Part of handler chain with: {', '.join(siblings[:2])}"
)
confidence += 0.2
if confidence >= 0.5:
@@ -1515,7 +1548,11 @@ class LanguageAdapter:
pattern.confidence = min(pattern.confidence + 0.1, 1.0)
# Strategy: Duck typing common in Python
elif pattern.pattern_type == "Strategy" and "duck typing" in evidence_str or "protocol" in evidence_str:
elif (
pattern.pattern_type == "Strategy"
and "duck typing" in evidence_str
or "protocol" in evidence_str
):
pattern.confidence = min(pattern.confidence + 0.05, 1.0)
# JavaScript/TypeScript adaptations
@@ -1532,7 +1569,11 @@ class LanguageAdapter:
pattern.confidence = min(pattern.confidence + 0.05, 1.0)
# Observer: Event emitters are built-in
elif pattern.pattern_type == "Observer" and "eventemitter" in evidence_str or "event" in evidence_str:
elif (
pattern.pattern_type == "Observer"
and "eventemitter" in evidence_str
or "event" in evidence_str
):
pattern.confidence = min(pattern.confidence + 0.1, 1.0)
pattern.evidence.append("EventEmitter pattern detected")
@@ -1549,7 +1590,9 @@ class LanguageAdapter:
pattern.evidence.append("Abstract Factory pattern")
# Template Method: Abstract classes common
elif pattern.pattern_type == "TemplateMethod" and "abstract" in evidence_str:
elif (
pattern.pattern_type == "TemplateMethod" and "abstract" in evidence_str
):
pattern.confidence = min(pattern.confidence + 0.1, 1.0)
# Go adaptations
@@ -1602,7 +1645,9 @@ class LanguageAdapter:
pattern.evidence.append("Ruby Singleton module")
# Builder: Method chaining is idiomatic
elif pattern.pattern_type == "Builder" and "method chaining" in evidence_str:
elif (
pattern.pattern_type == "Builder" and "method chaining" in evidence_str
):
pattern.confidence = min(pattern.confidence + 0.05, 1.0)
# PHP adaptations
@@ -1653,9 +1698,13 @@ Supported Languages:
)
parser.add_argument(
"--file", action="append", help="Source file to analyze (can be specified multiple times)"
"--file",
action="append",
help="Source file to analyze (can be specified multiple times)",
)
parser.add_argument(
"--directory", help="Directory to analyze (analyzes all source files)"
)
parser.add_argument("--directory", help="Directory to analyze (analyzes all source files)")
parser.add_argument(
"--output", help="Output directory for results (default: current directory)"
)
@@ -1666,7 +1715,9 @@ Supported Languages:
help="Detection depth: surface (fast), deep (default), full (thorough)",
)
parser.add_argument(
"--json", action="store_true", help="Output JSON format instead of human-readable"
"--json",
action="store_true",
help="Output JSON format instead of human-readable",
)
parser.add_argument("--verbose", action="store_true", help="Enable verbose output")

View File

@@ -194,11 +194,15 @@ class PythonTestAnalyzer:
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
if self._is_test_class(node):
examples.extend(self._extract_from_test_class(node, file_path, imports))
examples.extend(
self._extract_from_test_class(node, file_path, imports)
)
# Find test functions (pytest)
elif isinstance(node, ast.FunctionDef) and self._is_test_function(node):
examples.extend(self._extract_from_test_function(node, file_path, imports))
examples.extend(
self._extract_from_test_function(node, file_path, imports)
)
return examples
@@ -232,7 +236,9 @@ class PythonTestAnalyzer:
return True
# Has @pytest.mark decorator
for decorator in node.decorator_list:
if isinstance(decorator, ast.Attribute) and "pytest" in ast.unparse(decorator):
if isinstance(decorator, ast.Attribute) and "pytest" in ast.unparse(
decorator
):
return True
return False
@@ -249,7 +255,9 @@ class PythonTestAnalyzer:
for node in class_node.body:
if isinstance(node, ast.FunctionDef) and node.name.startswith("test_"):
examples.extend(
self._analyze_test_body(node, file_path, imports, setup_code=setup_code)
self._analyze_test_body(
node, file_path, imports, setup_code=setup_code
)
)
return examples
@@ -261,7 +269,9 @@ class PythonTestAnalyzer:
# Check for fixture parameters
fixture_setup = self._extract_fixtures(func_node)
return self._analyze_test_body(func_node, file_path, imports, setup_code=fixture_setup)
return self._analyze_test_body(
func_node, file_path, imports, setup_code=fixture_setup
)
def _extract_setup_method(self, class_node: ast.ClassDef) -> str | None:
"""Extract setUp method code"""
@@ -318,7 +328,9 @@ class PythonTestAnalyzer:
examples.extend(configs)
# 4. Multi-step workflows (integration tests)
workflows = self._find_workflows(func_node, file_path, docstring, setup_code, tags, imports)
workflows = self._find_workflows(
func_node, file_path, docstring, setup_code, tags, imports
)
examples.extend(workflows)
return examples
@@ -362,7 +374,11 @@ class PythonTestAnalyzer:
for node in ast.walk(func_node):
# Check if meaningful instantiation
if isinstance(node, ast.Assign) and isinstance(node.value, ast.Call) and self._is_meaningful_instantiation(node):
if (
isinstance(node, ast.Assign)
and isinstance(node.value, ast.Call)
and self._is_meaningful_instantiation(node)
):
code = ast.unparse(node)
# Skip trivial or mock-only
@@ -408,7 +424,11 @@ class PythonTestAnalyzer:
statements = func_node.body
for i, stmt in enumerate(statements):
# Look for method calls and check if next statement is an assertion
if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call) and i + 1 < len(statements):
if (
isinstance(stmt, ast.Expr)
and isinstance(stmt.value, ast.Call)
and i + 1 < len(statements)
):
next_stmt = statements[i + 1]
if self._is_assertion(next_stmt):
method_call = ast.unparse(stmt)
@@ -455,7 +475,11 @@ class PythonTestAnalyzer:
for node in ast.walk(func_node):
# Must have 2+ keys and be meaningful
if isinstance(node, ast.Assign) and isinstance(node.value, ast.Dict) and len(node.value.keys) >= 2:
if (
isinstance(node, ast.Assign)
and isinstance(node.value, ast.Dict)
and len(node.value.keys) >= 2
):
code = ast.unparse(node)
# Check if looks like configuration
@@ -467,7 +491,9 @@ class PythonTestAnalyzer:
code=code,
language="Python",
description=f"Configuration example: {description}",
expected_behavior=self._extract_assertion_after(func_node, node),
expected_behavior=self._extract_assertion_after(
func_node, node
),
setup_code=setup_code,
file_path=file_path,
line_start=node.lineno,
@@ -568,7 +594,9 @@ class PythonTestAnalyzer:
integration_keywords = ["workflow", "integration", "end_to_end", "e2e", "full"]
return any(keyword in test_name for keyword in integration_keywords)
def _extract_assertion_after(self, func_node: ast.FunctionDef, target_node: ast.AST) -> str:
def _extract_assertion_after(
self, func_node: ast.FunctionDef, target_node: ast.AST
) -> str:
"""Find assertion that follows the target node"""
found_target = False
for stmt in func_node.body:
@@ -699,7 +727,8 @@ class GenericTestAnalyzer:
code=config_match.group(0),
language=language,
file_path=file_path,
line_number=code[: start_pos + config_match.start()].count("\n") + 1,
line_number=code[: start_pos + config_match.start()].count("\n")
+ 1,
)
examples.append(example)
@@ -842,7 +871,9 @@ class TestExampleExtractor:
logger.warning(f"⚠️ Failed to initialize AI enhancer: {e}")
self.enhance_with_ai = False
def extract_from_directory(self, directory: Path, recursive: bool = True) -> ExampleReport:
def extract_from_directory(
self, directory: Path, recursive: bool = True
) -> ExampleReport:
"""Extract examples from all test files in directory"""
directory = Path(directory)
@@ -896,11 +927,13 @@ class TestExampleExtractor:
# Limit per file
if len(filtered_examples) > self.max_per_file:
# Sort by confidence and take top N
filtered_examples = sorted(filtered_examples, key=lambda x: x.confidence, reverse=True)[
: self.max_per_file
]
filtered_examples = sorted(
filtered_examples, key=lambda x: x.confidence, reverse=True
)[: self.max_per_file]
logger.info(f"Extracted {len(filtered_examples)} examples from {file_path.name}")
logger.info(
f"Extracted {len(filtered_examples)} examples from {file_path.name}"
)
return filtered_examples
@@ -955,7 +988,9 @@ class TestExampleExtractor:
# Calculate averages
avg_complexity = (
sum(ex.complexity_score for ex in examples) / len(examples) if examples else 0.0
sum(ex.complexity_score for ex in examples) / len(examples)
if examples
else 0.0
)
high_value_count = sum(1 for ex in examples if ex.confidence > 0.7)
@@ -1009,10 +1044,15 @@ Examples:
help="Minimum confidence threshold (0.0-1.0, default: 0.5)",
)
parser.add_argument(
"--max-per-file", type=int, default=10, help="Maximum examples per file (default: 10)"
"--max-per-file",
type=int,
default=10,
help="Maximum examples per file (default: 10)",
)
parser.add_argument("--json", action="store_true", help="Output JSON format")
parser.add_argument("--markdown", action="store_true", help="Output Markdown format")
parser.add_argument(
"--markdown", action="store_true", help="Output Markdown format"
)
parser.add_argument(
"--recursive",
action="store_true",
@@ -1029,7 +1069,9 @@ Examples:
# Create extractor
languages = [args.language] if args.language else None
extractor = TestExampleExtractor(
min_confidence=args.min_confidence, max_per_file=args.max_per_file, languages=languages
min_confidence=args.min_confidence,
max_per_file=args.max_per_file,
languages=languages,
)
# Extract examples
@@ -1037,7 +1079,9 @@ Examples:
examples = extractor.extract_from_file(Path(args.file))
report = extractor._create_report(examples, file_path=args.file)
else:
report = extractor.extract_from_directory(Path(args.directory), recursive=args.recursive)
report = extractor.extract_from_directory(
Path(args.directory), recursive=args.recursive
)
# Output results
if args.json:

View File

@@ -95,13 +95,20 @@ class UnifiedCodebaseAnalyzer:
# Step 1: Acquire source
if self.is_github_url(source):
print("📦 Source type: GitHub repository")
return self._analyze_github(source, depth, fetch_github_metadata, output_dir, interactive)
return self._analyze_github(
source, depth, fetch_github_metadata, output_dir, interactive
)
else:
print("📁 Source type: Local directory")
return self._analyze_local(source, depth)
def _analyze_github(
self, repo_url: str, depth: str, fetch_metadata: bool, output_dir: Path | None, interactive: bool = True
self,
repo_url: str,
depth: str,
fetch_metadata: bool,
output_dir: Path | None,
interactive: bool = True,
) -> AnalysisResult:
"""
Analyze GitHub repository with three-stream fetcher.
@@ -117,7 +124,9 @@ class UnifiedCodebaseAnalyzer:
AnalysisResult with all 3 streams
"""
# Use three-stream fetcher
fetcher = GitHubThreeStreamFetcher(repo_url, self.github_token, interactive=interactive)
fetcher = GitHubThreeStreamFetcher(
repo_url, self.github_token, interactive=interactive
)
three_streams = fetcher.fetch(output_dir)
# Analyze code with specified depth
@@ -236,7 +245,9 @@ class UnifiedCodebaseAnalyzer:
basic = self.basic_analysis(directory)
# Run full C3.x analysis using existing codebase_scraper
print("🔍 Running C3.x components (patterns, examples, guides, configs, architecture)...")
print(
"🔍 Running C3.x components (patterns, examples, guides, configs, architecture)..."
)
try:
# Import codebase analyzer
@@ -271,11 +282,19 @@ class UnifiedCodebaseAnalyzer:
c3x = {**basic, "analysis_type": "c3x", **c3x_data}
print("✅ C3.x analysis complete!")
print(f" - {len(c3x_data.get('c3_1_patterns', []))} design patterns detected")
print(f" - {c3x_data.get('c3_2_examples_count', 0)} test examples extracted")
print(f" - {len(c3x_data.get('c3_3_guides', []))} how-to guides generated")
print(
f" - {len(c3x_data.get('c3_1_patterns', []))} design patterns detected"
)
print(
f" - {c3x_data.get('c3_2_examples_count', 0)} test examples extracted"
)
print(
f" - {len(c3x_data.get('c3_3_guides', []))} how-to guides generated"
)
print(f" - {len(c3x_data.get('c3_4_configs', []))} config files analyzed")
print(f" - {len(c3x_data.get('c3_7_architecture', []))} architectural patterns found")
print(
f" - {len(c3x_data.get('c3_7_architecture', []))} architectural patterns found"
)
return c3x
@@ -432,7 +451,9 @@ class UnifiedCodebaseAnalyzer:
if item.is_dir():
# Only include immediate subdirectories
structure["children"].append({"name": item.name, "type": "directory"})
structure["children"].append(
{"name": item.name, "type": "directory"}
)
elif item.is_file():
structure["children"].append(
{"name": item.name, "type": "file", "extension": item.suffix}
@@ -526,7 +547,12 @@ class UnifiedCodebaseAnalyzer:
Returns:
Dict with statistics
"""
stats = {"total_files": 0, "total_size_bytes": 0, "file_types": {}, "languages": {}}
stats = {
"total_files": 0,
"total_size_bytes": 0,
"file_types": {},
"languages": {},
}
for file_path in directory.rglob("*"):
if not file_path.is_file():