#!/usr/bin/env python3 """ Documentation to Claude Skill Converter Single tool to scrape any documentation and create high-quality Claude skills. Usage: skill-seekers scrape --interactive skill-seekers scrape --config configs/godot.json skill-seekers scrape --url https://react.dev/ --name react """ import argparse import asyncio import hashlib import json import logging import os import re import sys import time from collections import defaultdict, deque from pathlib import Path from typing import Any, Optional from urllib.parse import urljoin, urlparse import httpx import requests from bs4 import BeautifulSoup # Add parent directory to path for imports when run as script sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from skill_seekers.cli.config_fetcher import ( get_last_searched_paths, list_available_configs, resolve_config_path, ) from skill_seekers.cli.config_validator import ConfigValidator from skill_seekers.cli.constants import ( CONTENT_PREVIEW_LENGTH, DEFAULT_ASYNC_MODE, DEFAULT_CHECKPOINT_INTERVAL, DEFAULT_MAX_PAGES, DEFAULT_RATE_LIMIT, MAX_PAGES_WARNING_THRESHOLD, MIN_CATEGORIZATION_SCORE, ) from skill_seekers.cli.language_detector import LanguageDetector from skill_seekers.cli.llms_txt_detector import LlmsTxtDetector from skill_seekers.cli.llms_txt_downloader import LlmsTxtDownloader from skill_seekers.cli.llms_txt_parser import LlmsTxtParser from skill_seekers.cli.arguments.scrape import add_scrape_arguments # Configure logging logger = logging.getLogger(__name__) def setup_logging(verbose: bool = False, quiet: bool = False) -> None: """Configure logging based on verbosity level. Args: verbose: Enable DEBUG level logging quiet: Enable WARNING level logging only """ if quiet: level = logging.WARNING elif verbose: level = logging.DEBUG else: level = logging.INFO logging.basicConfig(level=level, format="%(message)s", force=True) def infer_description_from_docs( base_url: str, first_page_content: str | None = None, name: str = "" ) -> str: """ Infer skill description from documentation metadata or first page content. Tries multiple strategies: 1. Extract meta description tag from first page 2. Extract first meaningful paragraph from content 3. Fall back to improved template Args: base_url: Documentation base URL first_page_content: HTML content of first page (optional) name: Skill name Returns: Description string suitable for "Use when..." format """ # If we have first page content, try to extract description if first_page_content: try: soup = BeautifulSoup(first_page_content, "html.parser") # Strategy 1: Try meta description tag meta_desc = soup.find("meta", {"name": "description"}) if meta_desc and meta_desc.get("content"): desc = meta_desc["content"].strip() if len(desc) > 20: # Meaningful length # Clean and format if len(desc) > 150: desc = desc[:147] + "..." return f"Use when {desc.lower()}" # Strategy 2: Try OpenGraph description og_desc = soup.find("meta", {"property": "og:description"}) if og_desc and og_desc.get("content"): desc = og_desc["content"].strip() if len(desc) > 20: if len(desc) > 150: desc = desc[:147] + "..." return f"Use when {desc.lower()}" # Strategy 3: Extract first meaningful paragraph from main content # Look for common documentation main content areas main_content = None for selector in [ "article", "main", 'div[role="main"]', "div.content", "div.doc-content", ]: main_content = soup.select_one(selector) if main_content: break if main_content: # Find first paragraph for p in main_content.find_all("p", limit=5): text = p.get_text().strip() # Skip empty, very short, or navigation-like paragraphs if len(text) > 30 and not any( skip in text.lower() for skip in ["table of contents", "on this page", "navigation"] ): # Clean and format if len(text) > 150: text = text[:147] + "..." return f"Use when working with {text.lower()}" except Exception as e: logger.debug(f"Could not infer description from page content: {e}") # Improved fallback template return ( f"Use when working with {name}" if name else f"Use when working with documentation at {urlparse(base_url).netloc}" ) class DocToSkillConverter: def __init__(self, config: dict[str, Any], dry_run: bool = False, resume: bool = False) -> None: self.config = config self.name = config["name"] self.base_url = config["base_url"] self.dry_run = dry_run self.resume = resume # Paths self.data_dir = f"output/{self.name}_data" self.skill_dir = f"output/{self.name}" self.checkpoint_file = f"{self.data_dir}/checkpoint.json" # Checkpoint config checkpoint_config = config.get("checkpoint", {}) self.checkpoint_enabled = checkpoint_config.get("enabled", False) self.checkpoint_interval = checkpoint_config.get("interval", DEFAULT_CHECKPOINT_INTERVAL) # llms.txt detection state skip_llms_txt_value = config.get("skip_llms_txt", False) if not isinstance(skip_llms_txt_value, bool): logger.warning( "Invalid value for 'skip_llms_txt': %r (expected bool). Defaulting to False.", skip_llms_txt_value, ) self.skip_llms_txt = False else: self.skip_llms_txt = skip_llms_txt_value self.llms_txt_detected = False self.llms_txt_variant = None self.llms_txt_variants: list[str] = [] # Track all downloaded variants # Parallel scraping config self.workers = config.get("workers", 1) self.async_mode = config.get("async_mode", DEFAULT_ASYNC_MODE) # State self.visited_urls: set[str] = set() # Support multiple starting URLs start_urls = config.get("start_urls", [self.base_url]) self.pending_urls = deque(start_urls) self.pages: list[dict[str, Any]] = [] self.pages_scraped = 0 # Language detection self.language_detector = LanguageDetector(min_confidence=0.15) # Thread-safe lock for parallel scraping if self.workers > 1: import threading self.lock = threading.Lock() # Create directories (unless dry-run) if not dry_run: os.makedirs(f"{self.data_dir}/pages", exist_ok=True) os.makedirs(f"{self.skill_dir}/references", exist_ok=True) os.makedirs(f"{self.skill_dir}/scripts", exist_ok=True) os.makedirs(f"{self.skill_dir}/assets", exist_ok=True) # Load checkpoint if resuming if resume and not dry_run: self.load_checkpoint() def is_valid_url(self, url: str) -> bool: """Check if URL should be scraped based on patterns. Args: url (str): URL to validate Returns: bool: True if URL matches include patterns and doesn't match exclude patterns """ if not url.startswith(self.base_url): return False # Include patterns includes = self.config.get("url_patterns", {}).get("include", []) if includes and not any(pattern in url for pattern in includes): return False # Exclude patterns excludes = self.config.get("url_patterns", {}).get("exclude", []) return not any(pattern in url for pattern in excludes) def save_checkpoint(self) -> None: """Save progress checkpoint""" if not self.checkpoint_enabled or self.dry_run: return checkpoint_data = { "config": self.config, "visited_urls": list(self.visited_urls), "pending_urls": list(self.pending_urls), "pages_scraped": self.pages_scraped, "last_updated": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "checkpoint_interval": self.checkpoint_interval, } try: with open(self.checkpoint_file, "w", encoding="utf-8") as f: json.dump(checkpoint_data, f, indent=2) logger.info(" πŸ’Ύ Checkpoint saved (%d pages)", self.pages_scraped) except Exception as e: logger.warning(" ⚠️ Failed to save checkpoint: %s", e) def load_checkpoint(self) -> None: """Load progress from checkpoint""" if not os.path.exists(self.checkpoint_file): logger.info("ℹ️ No checkpoint found, starting fresh") return try: with open(self.checkpoint_file, encoding="utf-8") as f: checkpoint_data = json.load(f) self.visited_urls = set(checkpoint_data["visited_urls"]) self.pending_urls = deque(checkpoint_data["pending_urls"]) self.pages_scraped = checkpoint_data["pages_scraped"] logger.info("βœ… Resumed from checkpoint") logger.info(" Pages already scraped: %d", self.pages_scraped) logger.info(" URLs visited: %d", len(self.visited_urls)) logger.info(" URLs pending: %d", len(self.pending_urls)) logger.info(" Last updated: %s", checkpoint_data["last_updated"]) logger.info("") except Exception as e: logger.warning("⚠️ Failed to load checkpoint: %s", e) logger.info(" Starting fresh") def clear_checkpoint(self) -> None: """Remove checkpoint file""" if os.path.exists(self.checkpoint_file): try: os.remove(self.checkpoint_file) logger.info("βœ… Checkpoint cleared") except Exception as e: logger.warning("⚠️ Failed to clear checkpoint: %s", e) def extract_content(self, soup: Any, url: str) -> dict[str, Any]: """Extract content with improved code and pattern detection""" page = { "url": url, "title": "", "content": "", "headings": [], "code_samples": [], "patterns": [], # NEW: Extract common patterns "links": [], } selectors = self.config.get("selectors", {}) # Extract title title_elem = soup.select_one(selectors.get("title", "title")) if title_elem: page["title"] = self.clean_text(title_elem.get_text()) # Find main content main_selector = selectors.get("main_content", 'div[role="main"]') main = soup.select_one(main_selector) if not main: logger.warning("⚠ No content: %s", url) return page # Extract headings with better structure for h in main.find_all(["h1", "h2", "h3", "h4", "h5", "h6"]): text = self.clean_text(h.get_text()) if text: page["headings"].append({"level": h.name, "text": text, "id": h.get("id", "")}) # Extract code with language detection code_selector = selectors.get("code_blocks", "pre code") for code_elem in main.select(code_selector): code = code_elem.get_text() if len(code.strip()) > 10: # Try to detect language lang = self.detect_language(code_elem, code) page["code_samples"].append({"code": code.strip(), "language": lang}) # Extract patterns (NEW: common code patterns) page["patterns"] = self.extract_patterns(main, page["code_samples"]) # Extract paragraphs paragraphs = [] for p in main.find_all("p"): text = self.clean_text(p.get_text()) if text and len(text) > 20: # Skip very short paragraphs paragraphs.append(text) page["content"] = "\n\n".join(paragraphs) # Extract links from entire page (not just main content) # This allows discovery of navigation links outside the main content area for link in soup.find_all("a", href=True): href = urljoin(url, link["href"]) # Strip anchor fragments to avoid treating #anchors as separate pages href = href.split("#")[0] if self.is_valid_url(href) and href not in page["links"]: page["links"].append(href) return page def _extract_markdown_content(self, content: str, url: str) -> dict[str, Any]: """Extract structured content from a Markdown file. Parses markdown files from llms.txt URLs to extract: - Title from first h1 heading - Headings (h2-h6, excluding h1) - Code blocks with language detection - Internal .md links for BFS crawling - Content paragraphs (>20 chars) Auto-detects HTML content and falls back to _extract_html_as_markdown. Args: content: Raw markdown content string (or HTML if server returned HTML) url: Source URL for resolving relative links Returns: Dict with keys: - url: str - Source URL - title: str - Extracted from first # heading - content: str - Paragraphs joined with double newlines - headings: List[Dict] - {'level': 'h2', 'text': str, 'id': str} - code_samples: List[Dict] - {'code': str, 'language': str} - links: List[str] - Absolute URLs to other .md files - patterns: List - Empty (reserved for future use) Note: Only .md links are extracted to avoid client-side rendered HTML pages. Anchor fragments (#section) are stripped from links. """ import re # Detect if content is actually HTML (some .md URLs return HTML) if content.strip().startswith(" 10: page["code_samples"].append({"code": code.strip(), "language": lang or "unknown"}) # Extract content (paragraphs) content_no_code = re.sub(r"```.*?```", "", content, flags=re.DOTALL) paragraphs = [] for para in content_no_code.split("\n\n"): text = para.strip() # Skip headings and short text if text and len(text) > 20 and not text.startswith("#"): paragraphs.append(text) page["content"] = "\n\n".join(paragraphs) # Extract links from markdown (only .md files to avoid client-side rendered HTML pages) md_links = re.findall(r"\[([^\]]*)\]\(([^)]+)\)", content) for _, href in md_links: if href.startswith("http"): full_url = href elif not href.startswith("#"): full_url = urljoin(url, href) else: continue # Strip anchor fragments full_url = full_url.split("#")[0] # Only include .md URLs to avoid client-side rendered HTML pages if ".md" in full_url and self.is_valid_url(full_url) and full_url not in page["links"]: page["links"].append(full_url) return page def _extract_html_as_markdown(self, html_content: str, url: str) -> dict[str, Any]: """Extract content from HTML and convert to markdown-like structure. Fallback method when .md URL returns HTML content instead of markdown. Uses BeautifulSoup to extract structured data from HTML elements. Extraction strategy: 1. Title from tag 2. Main content from <main>, <article>, [role="main"], or <body> 3. Headings (h1-h6) with text and id attributes 4. Code blocks from <pre><code> or <pre> tags 5. Text content from paragraphs Args: html_content: Raw HTML content string url: Source URL (for reference in result dict) Returns: Dict with keys: - url: str - Source URL - title: str - From <title> tag, cleaned - content: str - Text content from main area - headings: List[Dict] - {'level': 'h2', 'text': str, 'id': str} - code_samples: List[Dict] - {'code': str, 'language': str} - links: List - Empty (HTML links not extracted to avoid client-side routes) - patterns: List - Empty (reserved for future use) Note: Prefers <main> or <article> tags for content area. Falls back to <body> if no semantic content container found. Language detection uses detect_language() method. """ page = { "url": url, "title": "", "content": "", "headings": [], "code_samples": [], "patterns": [], "links": [], } soup = BeautifulSoup(html_content, "html.parser") # Try to extract title title_elem = soup.select_one("title") if title_elem: page["title"] = self.clean_text(title_elem.get_text()) # Try to find main content area main = soup.select_one('main, article, [role="main"], .content') if not main: main = soup.body if soup.body else soup if main: # Extract headings for h in main.find_all(["h1", "h2", "h3", "h4", "h5", "h6"]): text = self.clean_text(h.get_text()) if text: page["headings"].append({"level": h.name, "text": text, "id": h.get("id", "")}) # Extract code blocks for code_elem in main.select("pre code, pre"): code = code_elem.get_text() if len(code.strip()) > 10: lang = self.detect_language(code_elem, code) page["code_samples"].append({"code": code.strip(), "language": lang}) # Extract paragraphs paragraphs = [] for p in main.find_all("p"): text = self.clean_text(p.get_text()) if text and len(text) > 20: paragraphs.append(text) page["content"] = "\n\n".join(paragraphs) return page def detect_language(self, elem, code): """Detect programming language from code block UPDATED: Now uses confidence-based detection with 20+ languages """ lang, confidence = self.language_detector.detect_from_html(elem, code) # Log low-confidence detections for debugging if confidence < 0.5: logger.debug(f"Low confidence language detection: {lang} ({confidence:.2f})") return lang # Return string for backward compatibility def extract_patterns( self, main: Any, _code_samples: list[dict[str, Any]] ) -> list[dict[str, str]]: """Extract common coding patterns (NEW FEATURE)""" patterns = [] # Look for "Example:" or "Pattern:" sections for elem in main.find_all(["p", "div"]): text = elem.get_text().lower() if any(word in text for word in ["example:", "pattern:", "usage:", "typical use"]): # Get the code that follows next_code = elem.find_next(["pre", "code"]) if next_code: patterns.append( { "description": self.clean_text(elem.get_text()), "code": next_code.get_text().strip(), } ) return patterns[:5] # Limit to 5 most relevant patterns def clean_text(self, text: str) -> str: """Clean text content""" text = re.sub(r"\s+", " ", text) return text.strip() def save_page(self, page: dict[str, Any]) -> None: """Save page data (skip pages with empty content)""" # Skip pages with empty or very short content if not page.get("content") or len(page.get("content", "")) < 50: logger.debug("Skipping page with empty/short content: %s", page.get("url", "unknown")) return url_hash = hashlib.md5(page["url"].encode()).hexdigest()[:10] safe_title = re.sub(r"[^\w\s-]", "", page["title"])[:50] safe_title = re.sub(r"[-\s]+", "_", safe_title) filename = f"{safe_title}_{url_hash}.json" filepath = os.path.join(self.data_dir, "pages", filename) with open(filepath, "w", encoding="utf-8") as f: json.dump(page, f, indent=2, ensure_ascii=False) def scrape_page(self, url: str) -> None: """Scrape a single page with thread-safe operations. Args: url (str): URL to scrape Returns: dict or None: Page data dict on success, None on failure Note: Uses threading locks when workers > 1 for thread safety Supports both HTML pages and Markdown (.md) files """ try: # Scraping part (no lock needed - independent) headers = {"User-Agent": "Mozilla/5.0 (Documentation Scraper)"} response = requests.get(url, headers=headers, timeout=30) response.raise_for_status() # Check if this is a Markdown file if url.endswith(".md") or ".md" in url: page = self._extract_markdown_content(response.text, url) else: soup = BeautifulSoup(response.content, "html.parser") page = self.extract_content(soup, url) # Thread-safe operations (lock required) if self.workers > 1: with self.lock: logger.info(" %s", url) self.save_page(page) self.pages.append(page) # Add new URLs for link in page["links"]: if link not in self.visited_urls and link not in self.pending_urls: self.pending_urls.append(link) else: # Single-threaded mode (no lock needed) logger.info(" %s", url) self.save_page(page) self.pages.append(page) # Add new URLs for link in page["links"]: if link not in self.visited_urls and link not in self.pending_urls: self.pending_urls.append(link) # Rate limiting rate_limit = self.config.get("rate_limit", DEFAULT_RATE_LIMIT) if rate_limit > 0: time.sleep(rate_limit) except Exception as e: if self.workers > 1: with self.lock: logger.error(" βœ— Error scraping %s: %s: %s", url, type(e).__name__, e) else: logger.error(" βœ— Error scraping page: %s: %s", type(e).__name__, e) logger.error(" URL: %s", url) async def scrape_page_async( self, url: str, semaphore: asyncio.Semaphore, client: httpx.AsyncClient ) -> None: """Scrape a single page asynchronously. Args: url: URL to scrape semaphore: Asyncio semaphore for concurrency control client: Shared httpx AsyncClient for connection pooling Note: Uses asyncio.Lock for async-safe operations instead of threading.Lock Supports both HTML pages and Markdown (.md) files """ async with semaphore: # Limit concurrent requests try: # Async HTTP request headers = {"User-Agent": "Mozilla/5.0 (Documentation Scraper)"} response = await client.get(url, headers=headers, timeout=30.0) response.raise_for_status() # Check if this is a Markdown file if url.endswith(".md") or ".md" in url: page = self._extract_markdown_content(response.text, url) else: # BeautifulSoup parsing (still synchronous, but fast) soup = BeautifulSoup(response.content, "html.parser") page = self.extract_content(soup, url) # Async-safe operations (no lock needed - single event loop) logger.info(" %s", url) self.save_page(page) self.pages.append(page) # Add new URLs for link in page["links"]: if link not in self.visited_urls and link not in self.pending_urls: self.pending_urls.append(link) # Rate limiting rate_limit = self.config.get("rate_limit", DEFAULT_RATE_LIMIT) if rate_limit > 0: await asyncio.sleep(rate_limit) except Exception as e: logger.error(" βœ— Error scraping %s: %s: %s", url, type(e).__name__, e) def _convert_to_md_urls(self, urls: list[str]) -> list[str]: """ Convert URLs to .md format, trying /index.html.md suffix for non-.md URLs. Strips anchor fragments (#anchor) and deduplicates base URLs to avoid 404 errors. δΈι’„ε…ˆζ£€ζŸ₯ URL ζ˜―ε¦ε­˜εœ¨οΌŒη›΄ζŽ₯加ε…₯ι˜Ÿεˆ—οΌŒεœ¨ηˆ¬ε–ζ—Άε†ιͺŒθ―γ€‚ Args: urls: List of URLs to process Returns: List of .md URLs (ζœͺιͺŒθ―, deduplicated, no anchors) """ from urllib.parse import urlparse, urlunparse seen_base_urls = set() md_urls = [] for url in urls: # Parse URL to extract and remove fragment (anchor) parsed = urlparse(url) base_url = urlunparse(parsed._replace(fragment="")) # Remove #anchor # Skip if we've already processed this base URL if base_url in seen_base_urls: continue seen_base_urls.add(base_url) # Check if URL already ends with .md (not just contains "md") if base_url.endswith(".md"): md_urls.append(base_url) else: # η›΄ζŽ₯转捒为 .md ζ ΌεΌοΌŒδΈε‘ι€ HEAD 请求检ζŸ₯ base_url = base_url.rstrip("/") md_url = f"{base_url}/index.html.md" md_urls.append(md_url) logger.info( " βœ“ Converted %d URLs to %d unique .md URLs (anchors stripped, will validate during crawl)", len(urls), len(md_urls), ) return md_urls # ORIGINAL _convert_to_md_urls (with HEAD request validation): # def _convert_to_md_urls(self, urls: List[str]) -> List[str]: # md_urls = [] # non_md_urls = [] # for url in urls: # if '.md' in url: # md_urls.append(url) # else: # non_md_urls.append(url) # if non_md_urls: # logger.info(" πŸ”„ Trying to convert %d non-.md URLs to .md format...", len(non_md_urls)) # converted = 0 # for url in non_md_urls: # url = url.rstrip('/') # md_url = f"{url}/index.html.md" # try: # resp = requests.head(md_url, timeout=5, allow_redirects=True) # if resp.status_code == 200: # md_urls.append(md_url) # converted += 1 # except Exception: # pass # logger.info(" βœ“ Converted %d URLs to .md format", converted) # return md_urls def _try_llms_txt(self) -> bool: """ Try to use llms.txt instead of HTML scraping. Downloads ALL available variants and stores with .md extension. Returns: True if llms.txt was found and processed successfully """ logger.info("\nπŸ” Checking for llms.txt at %s...", self.base_url) # Check for explicit config URL first explicit_url = self.config.get("llms_txt_url") if explicit_url: logger.info("\nπŸ“Œ Using explicit llms_txt_url from config: %s", explicit_url) # Download explicit file first downloader = LlmsTxtDownloader(explicit_url) content = downloader.download() if content: # Save explicit file with proper .md extension filename = downloader.get_proper_filename() filepath = os.path.join(self.skill_dir, "references", filename) os.makedirs(os.path.dirname(filepath), exist_ok=True) with open(filepath, "w", encoding="utf-8") as f: f.write(content) logger.info(" πŸ’Ύ Saved %s (%d chars)", filename, len(content)) # Also try to detect and download ALL other variants detector = LlmsTxtDetector(self.base_url) variants = detector.detect_all() if variants: logger.info( "\nπŸ” Found %d total variant(s), downloading remaining...", len(variants), ) for variant_info in variants: url = variant_info["url"] variant = variant_info["variant"] # Skip the explicit one we already downloaded if url == explicit_url: continue logger.info(" πŸ“₯ Downloading %s...", variant) extra_downloader = LlmsTxtDownloader(url) extra_content = extra_downloader.download() if extra_content: extra_filename = extra_downloader.get_proper_filename() extra_filepath = os.path.join( self.skill_dir, "references", extra_filename ) with open(extra_filepath, "w", encoding="utf-8") as f: f.write(extra_content) logger.info( " βœ“ %s (%d chars)", extra_filename, len(extra_content), ) # Parse explicit file for skill building parser = LlmsTxtParser(content, self.base_url) # Extract URLs from llms.txt and add to pending_urls for BFS crawling extracted_urls = parser.extract_urls() if extracted_urls: # Convert non-.md URLs to .md format by trying /index.html.md suffix md_urls = self._convert_to_md_urls(extracted_urls) logger.info( "\nπŸ”— Found %d URLs in llms.txt (%d .md files), starting BFS crawl...", len(extracted_urls), len(md_urls), ) # Filter URLs based on url_patterns config for url in md_urls: if self.is_valid_url(url) and url not in self.visited_urls: self.pending_urls.append(url) logger.info( " πŸ“‹ %d URLs added to crawl queue after filtering", len(self.pending_urls), ) # Return False to trigger HTML scraping with the populated pending_urls self.llms_txt_detected = True self.llms_txt_variant = "explicit" return False # Continue with BFS crawling # Fallback: if no URLs found, use section-based parsing pages = parser.parse() if pages: for page in pages: self.save_page(page) self.pages.append(page) self.llms_txt_detected = True self.llms_txt_variant = "explicit" return True # Auto-detection: Find ALL variants detector = LlmsTxtDetector(self.base_url) variants = detector.detect_all() if not variants: logger.info("ℹ️ No llms.txt found, using HTML scraping") return False logger.info("βœ… Found %d llms.txt variant(s)", len(variants)) # Download ALL variants downloaded = {} for variant_info in variants: url = variant_info["url"] variant = variant_info["variant"] logger.info(" πŸ“₯ Downloading %s...", variant) downloader = LlmsTxtDownloader(url) content = downloader.download() if content: filename = downloader.get_proper_filename() downloaded[variant] = { "content": content, "filename": filename, "size": len(content), } logger.info(" βœ“ %s (%d chars)", filename, len(content)) if not downloaded: logger.warning("⚠️ Failed to download any variants, falling back to HTML scraping") return False # Save ALL variants to references/ os.makedirs(os.path.join(self.skill_dir, "references"), exist_ok=True) for _variant, data in downloaded.items(): filepath = os.path.join(self.skill_dir, "references", data["filename"]) with open(filepath, "w", encoding="utf-8") as f: f.write(data["content"]) logger.info(" πŸ’Ύ Saved %s", data["filename"]) # Parse LARGEST variant for skill building largest = max(downloaded.items(), key=lambda x: x[1]["size"]) logger.info("\nπŸ“„ Parsing %s for skill building...", largest[1]["filename"]) parser = LlmsTxtParser(largest[1]["content"], self.base_url) # Extract URLs from llms.txt and add to pending_urls for BFS crawling extracted_urls = parser.extract_urls() if extracted_urls: # Convert non-.md URLs to .md format by trying /index.html.md suffix md_urls = self._convert_to_md_urls(extracted_urls) logger.info( "\nπŸ”— Found %d URLs in llms.txt (%d .md files), starting BFS crawl...", len(extracted_urls), len(md_urls), ) # Filter URLs based on url_patterns config for url in md_urls: if self.is_valid_url(url) and url not in self.visited_urls: self.pending_urls.append(url) logger.info( " πŸ“‹ %d URLs added to crawl queue after filtering", len(self.pending_urls), ) # Return False to trigger HTML scraping with the populated pending_urls self.llms_txt_detected = True self.llms_txt_variants = list(downloaded.keys()) return False # Continue with BFS crawling # Fallback: if no URLs found, use section-based parsing pages = parser.parse() if not pages: logger.warning("⚠️ Failed to parse llms.txt, falling back to HTML scraping") return False logger.info(" βœ“ Parsed %d sections", len(pages)) # Save pages for skill building for page in pages: self.save_page(page) self.pages.append(page) self.llms_txt_detected = True self.llms_txt_variants = list(downloaded.keys()) return True def scrape_all(self) -> None: """Scrape all pages (supports llms.txt and HTML scraping) Routes to async version if async_mode is enabled in config. """ # Route to async version if enabled if self.async_mode: asyncio.run(self.scrape_all_async()) return # Try llms.txt first (unless dry-run or explicitly disabled) if not self.dry_run and not self.skip_llms_txt: llms_result = self._try_llms_txt() if llms_result: logger.info( "\nβœ… Used llms.txt (%s) - skipping HTML scraping", self.llms_txt_variant, ) self.save_summary() return # HTML scraping (sync/thread-based logic) logger.info("\n" + "=" * 60) if self.dry_run: logger.info("DRY RUN: %s", self.name) else: logger.info("SCRAPING: %s", self.name) logger.info("=" * 60) logger.info("Base URL: %s", self.base_url) if self.dry_run: logger.info("Mode: Preview only (no actual scraping)\n") else: logger.info("Output: %s", self.data_dir) if self.workers > 1: logger.info("Workers: %d parallel threads", self.workers) logger.info("") max_pages = self.config.get("max_pages", DEFAULT_MAX_PAGES) # Handle unlimited mode if max_pages is None or max_pages == -1: logger.warning("⚠️ UNLIMITED MODE: No page limit (will scrape all pages)\n") unlimited = True else: unlimited = False # Dry run: preview first 20 URLs preview_limit = 20 if self.dry_run else max_pages # Single-threaded mode (original sequential logic) if self.workers <= 1: while self.pending_urls and (unlimited or len(self.visited_urls) < preview_limit): url = self.pending_urls.popleft() if url in self.visited_urls: continue self.visited_urls.add(url) if self.dry_run: # Just show what would be scraped logger.info(" [Preview] %s", url) try: headers = {"User-Agent": "Mozilla/5.0 (Documentation Scraper - Dry Run)"} response = requests.get(url, headers=headers, timeout=10) soup = BeautifulSoup(response.content, "html.parser") main_selector = self.config.get("selectors", {}).get( "main_content", 'div[role="main"]' ) main = soup.select_one(main_selector) if main: for link in main.find_all("a", href=True): href = urljoin(url, link["href"]) if self.is_valid_url(href) and href not in self.visited_urls: self.pending_urls.append(href) except Exception as e: # Failed to extract links in fast mode, continue anyway logger.warning("⚠️ Warning: Could not extract links from %s: %s", url, e) else: self.scrape_page(url) self.pages_scraped += 1 if ( self.checkpoint_enabled and self.pages_scraped % self.checkpoint_interval == 0 ): self.save_checkpoint() if len(self.visited_urls) % 10 == 0: logger.info(" [%d pages]", len(self.visited_urls)) # Multi-threaded mode (parallel scraping) else: from concurrent.futures import ThreadPoolExecutor, as_completed logger.info("πŸš€ Starting parallel scraping with %d workers\n", self.workers) with ThreadPoolExecutor(max_workers=self.workers) as executor: futures = [] while self.pending_urls and (unlimited or len(self.visited_urls) < preview_limit): # Get next batch of URLs (thread-safe) batch = [] batch_size = min(self.workers * 2, len(self.pending_urls)) with self.lock: for _ in range(batch_size): if not self.pending_urls: break url = self.pending_urls.popleft() if url not in self.visited_urls: self.visited_urls.add(url) batch.append(url) # Submit batch to executor for url in batch: if unlimited or len(self.visited_urls) <= preview_limit: future = executor.submit(self.scrape_page, url) futures.append(future) # Wait for some to complete before submitting more for future in as_completed(futures[:batch_size]): # Check for exceptions try: future.result() # Raises exception if scrape_page failed except Exception as e: with self.lock: logger.warning(" ⚠️ Worker exception: %s", e) with self.lock: self.pages_scraped += 1 if ( self.checkpoint_enabled and self.pages_scraped % self.checkpoint_interval == 0 ): self.save_checkpoint() if self.pages_scraped % 10 == 0: logger.info(" [%d pages scraped]", self.pages_scraped) # Remove completed futures futures = [f for f in futures if not f.done()] # Wait for remaining futures for future in as_completed(futures): # Check for exceptions try: future.result() except Exception as e: with self.lock: logger.warning(" ⚠️ Worker exception: %s", e) with self.lock: self.pages_scraped += 1 if self.dry_run: logger.info("\nβœ… Dry run complete: would scrape ~%d pages", len(self.visited_urls)) if len(self.visited_urls) >= preview_limit: logger.info( " (showing first %d, actual scraping may find more)", preview_limit, ) logger.info("\nπŸ’‘ To actually scrape, run without --dry-run") else: logger.info("\nβœ… Scraped %d pages", len(self.visited_urls)) self.save_summary() async def scrape_all_async(self) -> None: """Scrape all pages asynchronously (async/await version). This method provides significantly better performance for parallel scraping compared to thread-based scraping, with lower memory overhead and better CPU utilization. Performance: ~2-3x faster than sync mode with same worker count. """ # Try llms.txt first (unless dry-run or explicitly disabled) if not self.dry_run and not self.skip_llms_txt: llms_result = self._try_llms_txt() if llms_result: logger.info( "\nβœ… Used llms.txt (%s) - skipping HTML scraping", self.llms_txt_variant, ) self.save_summary() return # HTML scraping (async version) logger.info("\n" + "=" * 60) if self.dry_run: logger.info("DRY RUN (ASYNC): %s", self.name) else: logger.info("SCRAPING (ASYNC): %s", self.name) logger.info("=" * 60) logger.info("Base URL: %s", self.base_url) if self.dry_run: logger.info("Mode: Preview only (no actual scraping)\n") else: logger.info("Output: %s", self.data_dir) logger.info("Workers: %d concurrent tasks (async)", self.workers) logger.info("") max_pages = self.config.get("max_pages", DEFAULT_MAX_PAGES) # Handle unlimited mode if max_pages is None or max_pages == -1: logger.warning("⚠️ UNLIMITED MODE: No page limit (will scrape all pages)\n") unlimited = True preview_limit = float("inf") else: unlimited = False preview_limit = 20 if self.dry_run else max_pages # Create semaphore for concurrency control semaphore = asyncio.Semaphore(self.workers) # Create shared HTTP client with connection pooling async with httpx.AsyncClient( timeout=30.0, limits=httpx.Limits(max_connections=self.workers * 2) ) as client: tasks = [] while self.pending_urls and (unlimited or len(self.visited_urls) < preview_limit): # Get next batch of URLs batch = [] batch_size = min(self.workers * 2, len(self.pending_urls)) for _ in range(batch_size): if not self.pending_urls: break url = self.pending_urls.popleft() if url not in self.visited_urls: self.visited_urls.add(url) batch.append(url) # Create async tasks for batch for url in batch: if unlimited or len(self.visited_urls) <= preview_limit: if self.dry_run: logger.info(" [Preview] %s", url) else: task = asyncio.create_task( self.scrape_page_async(url, semaphore, client) ) tasks.append(task) # Wait for batch to complete before continuing if tasks: await asyncio.gather(*tasks, return_exceptions=True) tasks = [] self.pages_scraped = len(self.visited_urls) # Progress indicator if self.pages_scraped % 10 == 0 and not self.dry_run: logger.info(" [%d pages scraped]", self.pages_scraped) # Checkpoint saving if ( not self.dry_run and self.checkpoint_enabled and self.pages_scraped % self.checkpoint_interval == 0 ): self.save_checkpoint() # Wait for any remaining tasks if tasks: await asyncio.gather(*tasks, return_exceptions=True) if self.dry_run: logger.info("\nβœ… Dry run complete: would scrape ~%d pages", len(self.visited_urls)) if len(self.visited_urls) >= preview_limit: logger.info( " (showing first %d, actual scraping may find more)", int(preview_limit), ) logger.info("\nπŸ’‘ To actually scrape, run without --dry-run") else: logger.info("\nβœ… Scraped %d pages (async mode)", len(self.visited_urls)) self.save_summary() def save_summary(self) -> None: """Save scraping summary""" summary = { "name": self.name, "total_pages": len(self.pages), "base_url": self.base_url, "llms_txt_detected": self.llms_txt_detected, "llms_txt_variant": self.llms_txt_variant, "pages": [{"title": p["title"], "url": p["url"]} for p in self.pages], } with open(f"{self.data_dir}/summary.json", "w", encoding="utf-8") as f: json.dump(summary, f, indent=2, ensure_ascii=False) def load_scraped_data(self) -> list[dict[str, Any]]: """Load previously scraped data""" pages = [] pages_dir = Path(self.data_dir) / "pages" if not pages_dir.exists(): return [] for json_file in pages_dir.glob("*.json"): try: with open(json_file, encoding="utf-8") as f: pages.append(json.load(f)) except Exception as e: logger.error( "⚠️ Error loading scraped data file %s: %s: %s", json_file, type(e).__name__, e, ) logger.error( " Suggestion: File may be corrupted, consider re-scraping with --fresh" ) return pages def smart_categorize(self, pages: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: """Improved categorization with better pattern matching""" category_defs = self.config.get("categories", {}) # Default smart categories if none provided if not category_defs: category_defs = self.infer_categories(pages) categories: dict[str, list[dict[str, Any]]] = {cat: [] for cat in category_defs} categories["other"] = [] for page in pages: url = page["url"].lower() title = page["title"].lower() content = page.get("content", "").lower()[ :CONTENT_PREVIEW_LENGTH ] # Check first N chars for categorization categorized = False # Match against keywords for cat, keywords in category_defs.items(): score = 0 for keyword in keywords: keyword = keyword.lower() if keyword in url: score += 3 if keyword in title: score += 2 if keyword in content: score += 1 if score >= MIN_CATEGORIZATION_SCORE: # Threshold for categorization categories[cat].append(page) categorized = True break if not categorized: categories["other"].append(page) # Remove empty categories categories = {k: v for k, v in categories.items() if v} return categories def infer_categories(self, pages: list[dict[str, Any]]) -> dict[str, list[str]]: """Infer categories from URL patterns (IMPROVED)""" url_segments: defaultdict[str, int] = defaultdict(int) for page in pages: path = urlparse(page["url"]).path segments = [ s for s in path.split("/") if s and s not in ["en", "stable", "latest", "docs"] ] for seg in segments: url_segments[seg] += 1 # Top segments become categories top_segments = sorted(url_segments.items(), key=lambda x: x[1], reverse=True)[:8] categories = {} for seg, count in top_segments: if count >= 3: # At least 3 pages categories[seg] = [seg] # Add common defaults if "tutorial" not in categories and any( "tutorial" in url for url in [p["url"] for p in pages] ): categories["tutorials"] = ["tutorial", "guide", "getting-started"] if "api" not in categories and any( "api" in url or "reference" in url for url in [p["url"] for p in pages] ): categories["api"] = ["api", "reference", "class"] return categories def generate_quick_reference(self, pages: list[dict[str, Any]]) -> list[dict[str, str]]: """Generate quick reference from common patterns (NEW FEATURE)""" quick_ref = [] # Collect all patterns all_patterns = [] for page in pages: all_patterns.extend(page.get("patterns", [])) # Get most common code patterns seen_codes = set() for pattern in all_patterns: code = pattern["code"] if code not in seen_codes and len(code) < 300: quick_ref.append(pattern) seen_codes.add(code) if len(quick_ref) >= 15: break return quick_ref def create_reference_file(self, category: str, pages: list[dict[str, Any]]) -> None: """Create enhanced reference file""" if not pages: return lines = [] lines.append(f"# {self.name.title()} - {category.replace('_', ' ').title()}\n") lines.append(f"**Pages:** {len(pages)}\n") lines.append("---\n") for page in pages: lines.append(f"## {page['title']}\n") lines.append(f"**URL:** {page['url']}\n") # Table of contents from headings if page.get("headings"): lines.append("**Contents:**") for h in page["headings"][:10]: level = int(h["level"][1]) if len(h["level"]) > 1 else 1 indent = " " * max(0, level - 2) lines.append(f"{indent}- {h['text']}") lines.append("") # Content (NO TRUNCATION) if page.get("content"): lines.append(page["content"]) lines.append("") # Code examples with language (NO TRUNCATION) if page.get("code_samples"): lines.append("**Examples:**\n") for i, sample in enumerate(page["code_samples"][:4], 1): lang = sample.get("language", "unknown") code = sample.get("code", sample if isinstance(sample, str) else "") lines.append(f"Example {i} ({lang}):") lines.append(f"```{lang}") lines.append(code) # Full code, no truncation lines.append("```\n") lines.append("---\n") filepath = os.path.join(self.skill_dir, "references", f"{category}.md") with open(filepath, "w", encoding="utf-8") as f: f.write("\n".join(lines)) logger.info(" βœ“ %s.md (%d pages)", category, len(pages)) def create_enhanced_skill_md( self, categories: dict[str, list[dict[str, Any]]], quick_ref: list[dict[str, str]], ) -> None: """Create SKILL.md with actual examples (IMPROVED)""" # Try to infer description if not in config if "description" not in self.config: # Get first page HTML content to infer description first_page_html = None for pages in categories.values(): if pages: first_page_html = pages[0].get("raw_html", "") break description = infer_description_from_docs(self.base_url, first_page_html, self.name) else: description = self.config["description"] # Extract actual code examples from docs example_codes = [] for pages in categories.values(): for page in pages[:3]: # First 3 pages per category for sample in page.get("code_samples", [])[:2]: # First 2 samples per page code = sample.get("code", sample if isinstance(sample, str) else "") lang = sample.get("language", "unknown") if len(code) < 200 and lang != "unknown": example_codes.append((lang, code)) if len(example_codes) >= 10: break if len(example_codes) >= 10: break if len(example_codes) >= 10: break content = f"""--- name: {self.name} description: {description} --- # {self.name.title()} Skill {description.capitalize()}, generated from official documentation. ## When to Use This Skill This skill should be triggered when: - Working with {self.name} - Asking about {self.name} features or APIs - Implementing {self.name} solutions - Debugging {self.name} code - Learning {self.name} best practices ## Quick Reference ### Common Patterns """ # Add actual quick reference patterns if quick_ref: for i, pattern in enumerate(quick_ref[:8], 1): desc = pattern.get("description", "Example pattern") # Format description: extract first sentence, truncate if too long first_sentence = desc.split(".")[0] if "." in desc else desc if len(first_sentence) > 150: first_sentence = first_sentence[:147] + "..." content += f"**Pattern {i}:** {first_sentence}\n\n" content += "```\n" content += pattern.get("code", "")[:300] content += "\n```\n\n" else: content += "*Quick reference patterns will be added as you use the skill.*\n\n" # Add example codes from docs if example_codes: content += "### Example Code Patterns\n\n" for i, (lang, code) in enumerate(example_codes[:5], 1): content += f"**Example {i}** ({lang}):\n```{lang}\n{code}\n```\n\n" content += """## Reference Files This skill includes comprehensive documentation in `references/`: """ for cat in sorted(categories.keys()): content += f"- **{cat}.md** - {cat.replace('_', ' ').title()} documentation\n" content += """ Use `view` to read specific reference files when detailed information is needed. ## Working with This Skill ### For Beginners Start with the getting_started or tutorials reference files for foundational concepts. ### For Specific Features Use the appropriate category reference file (api, guides, etc.) for detailed information. ### For Code Examples The quick reference section above contains common patterns extracted from the official docs. ## Resources ### references/ Organized documentation extracted from official sources. These files contain: - Detailed explanations - Code examples with language annotations - Links to original documentation - Table of contents for quick navigation ### scripts/ Add helper scripts here for common automation tasks. ### assets/ Add templates, boilerplate, or example projects here. ## Notes - This skill was automatically generated from official documentation - Reference files preserve the structure and examples from source docs - Code examples include language detection for better syntax highlighting - Quick reference patterns are extracted from common usage examples in the docs ## Updating To refresh this skill with updated documentation: 1. Re-run the scraper with the same configuration 2. The skill will be rebuilt with the latest information """ filepath = os.path.join(self.skill_dir, "SKILL.md") with open(filepath, "w", encoding="utf-8") as f: f.write(content) logger.info(" βœ“ SKILL.md (enhanced with %d examples)", len(example_codes)) def create_index(self, categories: dict[str, list[dict[str, Any]]]) -> None: """Create navigation index""" lines = [] lines.append(f"# {self.name.title()} Documentation Index\n") lines.append("## Categories\n") for cat, pages in sorted(categories.items()): lines.append(f"### {cat.replace('_', ' ').title()}") lines.append(f"**File:** `{cat}.md`") lines.append(f"**Pages:** {len(pages)}\n") filepath = os.path.join(self.skill_dir, "references", "index.md") with open(filepath, "w", encoding="utf-8") as f: f.write("\n".join(lines)) logger.info(" βœ“ index.md") def build_skill(self) -> bool: """Build the skill from scraped data. Loads scraped JSON files, categorizes pages, extracts patterns, and generates SKILL.md and reference files. Returns: bool: True if build succeeded, False otherwise """ logger.info("\n" + "=" * 60) logger.info("BUILDING SKILL: %s", self.name) logger.info("=" * 60 + "\n") # Load data logger.info("Loading scraped data...") pages = self.load_scraped_data() if not pages: logger.error("βœ— No scraped data found!") return False logger.info(" βœ“ Loaded %d pages\n", len(pages)) # Categorize logger.info("Categorizing pages...") categories = self.smart_categorize(pages) logger.info(" βœ“ Created %d categories\n", len(categories)) # Generate quick reference logger.info("Generating quick reference...") quick_ref = self.generate_quick_reference(pages) logger.info(" βœ“ Extracted %d patterns\n", len(quick_ref)) # Create reference files logger.info("Creating reference files...") for cat, cat_pages in categories.items(): self.create_reference_file(cat, cat_pages) # Create index self.create_index(categories) logger.info("") # Create enhanced SKILL.md logger.info("Creating SKILL.md...") self.create_enhanced_skill_md(categories, quick_ref) logger.info("\nβœ… Skill built: %s/", self.skill_dir) return True def validate_config(config: dict[str, Any]) -> tuple[list[str], list[str]]: """Validate configuration structure and values. Args: config (dict): Configuration dictionary to validate Returns: tuple: (errors, warnings) where each is a list of strings Example: >>> errors, warnings = validate_config({'name': 'test', 'base_url': 'https://example.com'}) >>> if errors: ... print("Invalid config:", errors) """ errors = [] warnings = [] # Required fields required_fields = ["name", "base_url"] for field in required_fields: if field not in config: errors.append(f"Missing required field: '{field}'") # Validate name (alphanumeric, hyphens, underscores only) if "name" in config and not re.match(r"^[a-zA-Z0-9_-]+$", config["name"]): errors.append( f"Invalid name: '{config['name']}' (use only letters, numbers, hyphens, underscores)" ) # Validate base_url if "base_url" in config and not config["base_url"].startswith(("http://", "https://")): errors.append( f"Invalid base_url: '{config['base_url']}' (must start with http:// or https://)" ) # Validate selectors structure if "selectors" in config: if not isinstance(config["selectors"], dict): errors.append("'selectors' must be a dictionary") else: recommended_selectors = ["main_content", "title", "code_blocks"] for selector in recommended_selectors: if selector not in config["selectors"]: warnings.append(f"Missing recommended selector: '{selector}'") else: warnings.append("Missing 'selectors' section (recommended)") # Validate url_patterns if "url_patterns" in config: if not isinstance(config["url_patterns"], dict): errors.append("'url_patterns' must be a dictionary") else: for key in ["include", "exclude"]: if key in config["url_patterns"] and not isinstance( config["url_patterns"][key], list ): errors.append(f"'url_patterns.{key}' must be a list") # Validate categories if "categories" in config: if not isinstance(config["categories"], dict): errors.append("'categories' must be a dictionary") else: for cat_name, keywords in config["categories"].items(): if not isinstance(keywords, list): errors.append(f"'categories.{cat_name}' must be a list of keywords") # Validate rate_limit if "rate_limit" in config: try: rate = float(config["rate_limit"]) if rate < 0: errors.append(f"'rate_limit' must be non-negative (got {rate})") elif rate > 10: warnings.append( f"'rate_limit' is very high ({rate}s) - this may slow down scraping significantly" ) except (ValueError, TypeError): errors.append(f"'rate_limit' must be a number (got {config['rate_limit']})") # Validate max_pages if "max_pages" in config: max_p_value = config["max_pages"] # Allow None for unlimited if max_p_value is None: warnings.append( "'max_pages' is None (unlimited) - this will scrape ALL pages. Use with caution!" ) else: try: max_p = int(max_p_value) # Allow -1 for unlimited if max_p == -1: warnings.append( "'max_pages' is -1 (unlimited) - this will scrape ALL pages. Use with caution!" ) elif max_p < 1: errors.append( f"'max_pages' must be at least 1 or -1 for unlimited (got {max_p})" ) elif max_p > MAX_PAGES_WARNING_THRESHOLD: warnings.append( f"'max_pages' is very high ({max_p}) - scraping may take a very long time" ) except (ValueError, TypeError): errors.append( f"'max_pages' must be an integer, -1, or null (got {config['max_pages']})" ) # Validate start_urls if present if "start_urls" in config: if not isinstance(config["start_urls"], list): errors.append("'start_urls' must be a list") else: for url in config["start_urls"]: if not url.startswith(("http://", "https://")): errors.append( f"Invalid start_url: '{url}' (must start with http:// or https://)" ) return errors, warnings def load_config(config_path: str) -> dict[str, Any]: """Load and validate configuration from JSON file. Automatically fetches configs from SkillSeekersWeb.com API if not found locally. Args: config_path (str): Path to JSON configuration file Returns: dict: Validated configuration dictionary Raises: SystemExit: If config is invalid or file not found Example: >>> config = load_config('configs/react.json') >>> print(config['name']) 'react' """ # Try to resolve config path (with auto-fetch from API) resolved_path = resolve_config_path(config_path, auto_fetch=True) if resolved_path is None: # Config not found locally and fetch failed available = list_available_configs() searched_paths = get_last_searched_paths() logger.error("❌ Error: Config file not found: %s", config_path) logger.error("") logger.error(" Searched in these locations:") for i, path in enumerate(searched_paths, 1): logger.error(" %d. %s", i, path) logger.error(" %d. SkillSeekersWeb.com API", len(searched_paths) + 1) logger.error("") # Show where user should place custom configs user_config_dir = Path.home() / ".config" / "skill-seekers" / "configs" logger.error(" πŸ’‘ To use a custom config, place it in one of these locations:") logger.error(" β€’ Current directory: ./configs/%s", Path(config_path).name) logger.error(" β€’ User config directory: %s", user_config_dir / Path(config_path).name) logger.error(" β€’ Absolute path: /full/path/to/%s", Path(config_path).name) logger.error("") if available: logger.error(" πŸ“‹ Or use a preset config from API (%d total):", len(available)) for cfg in available[:10]: # Show first 10 logger.error(" β€’ %s", cfg) if len(available) > 10: logger.error(" ... and %d more", len(available) - 10) logger.error("") logger.error(" πŸ’‘ Use any preset: skill-seekers scrape --config <name>.json") logger.error(" 🌐 Browse all: https://skillseekersweb.com/") else: logger.error(" ⚠️ Could not connect to API to list available configs") logger.error(" 🌐 Visit: https://skillseekersweb.com/ for available configs") sys.exit(1) # Load the resolved config file try: with open(resolved_path, encoding="utf-8") as f: config = json.load(f) except json.JSONDecodeError as e: logger.error("❌ Error: Invalid JSON in config file: %s", resolved_path) logger.error(" Details: %s", e) logger.error(" Suggestion: Check syntax at line %d, column %d", e.lineno, e.colno) sys.exit(1) # Validate config using ConfigValidator (supports both unified and legacy formats) try: validator = ConfigValidator(config) validator.validate() # Log config type if validator.is_unified: logger.debug("βœ“ Unified config format detected") else: logger.debug("βœ“ Legacy config format detected") except ValueError as e: logger.error("❌ Configuration validation errors in %s:", config_path) logger.error(" %s", str(e)) logger.error( "\n Suggestion: Fix the above errors or check https://skillseekersweb.com/ for examples" ) sys.exit(1) return config def interactive_config() -> dict[str, Any]: """Interactive configuration wizard for creating new configs. Prompts user for all required configuration fields step-by-step and returns a complete configuration dictionary. Returns: dict: Complete configuration dictionary with user-provided values Example: >>> config = interactive_config() # User enters: name=react, url=https://react.dev, etc. >>> config['name'] 'react' """ logger.info("\n" + "=" * 60) logger.info("Documentation to Skill Converter") logger.info("=" * 60 + "\n") config: dict[str, Any] = {} # Basic info config["name"] = input("Skill name (e.g., 'react', 'godot'): ").strip() config["description"] = input("Skill description: ").strip() config["base_url"] = input("Base URL (e.g., https://docs.example.com/): ").strip() if not config["base_url"].endswith("/"): config["base_url"] += "/" # Selectors logger.info("\nCSS Selectors (press Enter for defaults):") selectors = {} selectors["main_content"] = ( input(" Main content [div[role='main']]: ").strip() or "div[role='main']" ) selectors["title"] = input(" Title [title]: ").strip() or "title" selectors["code_blocks"] = input(" Code blocks [pre code]: ").strip() or "pre code" config["selectors"] = selectors # URL patterns logger.info("\nURL Patterns (comma-separated, optional):") include = input(" Include: ").strip() exclude = input(" Exclude: ").strip() config["url_patterns"] = { "include": [p.strip() for p in include.split(",") if p.strip()], "exclude": [p.strip() for p in exclude.split(",") if p.strip()], } # Settings rate = input(f"\nRate limit (seconds) [{DEFAULT_RATE_LIMIT}]: ").strip() config["rate_limit"] = float(rate) if rate else DEFAULT_RATE_LIMIT max_p = input(f"Max pages [{DEFAULT_MAX_PAGES}]: ").strip() config["max_pages"] = int(max_p) if max_p else DEFAULT_MAX_PAGES return config def check_existing_data(name: str) -> tuple[bool, int]: """Check if scraped data already exists for a skill. Args: name (str): Skill name to check Returns: tuple: (exists, page_count) where exists is bool and page_count is int Example: >>> exists, count = check_existing_data('react') >>> if exists: ... print(f"Found {count} existing pages") """ data_dir = f"output/{name}_data" if os.path.exists(data_dir) and os.path.exists(f"{data_dir}/summary.json"): with open(f"{data_dir}/summary.json", encoding="utf-8") as f: summary = json.load(f) return True, summary.get("total_pages", 0) return False, 0 def setup_argument_parser() -> argparse.ArgumentParser: """Setup and configure command-line argument parser. Creates an ArgumentParser with all CLI options for the doc scraper tool, including configuration, scraping, enhancement, and performance options. All arguments are defined in skill_seekers.cli.arguments.scrape to ensure consistency between the standalone scraper and unified CLI. Returns: argparse.ArgumentParser: Configured argument parser Example: >>> parser = setup_argument_parser() >>> args = parser.parse_args(['--config', 'configs/react.json']) >>> print(args.config) configs/react.json """ parser = argparse.ArgumentParser( description="Convert documentation websites to Claude skills", formatter_class=argparse.RawDescriptionHelpFormatter, ) # Add all scrape arguments from shared definitions # This ensures the standalone scraper and unified CLI stay in sync add_scrape_arguments(parser) return parser def get_configuration(args: argparse.Namespace) -> dict[str, Any]: """Load or create configuration from command-line arguments. Handles three configuration modes: 1. Load from JSON file (--config) 2. Interactive configuration wizard (--interactive or missing args) 3. Quick mode from command-line arguments (--name, --url) Also applies CLI overrides for rate limiting and worker count. Args: args: Parsed command-line arguments from argparse Returns: dict: Configuration dictionary with all required fields Example: >>> args = parser.parse_args(['--name', 'react', '--url', 'https://react.dev']) >>> config = get_configuration(args) >>> print(config['name']) react """ # Handle URL from either positional argument or --url flag # Positional 'url' takes precedence, then --url flag effective_url = getattr(args, "url", None) # Get base configuration if args.config: config = load_config(args.config) elif args.interactive or not (args.name and effective_url): config = interactive_config() else: config = { "name": args.name, "description": args.description or f"Use when working with {args.name}", "base_url": effective_url, "selectors": { "main_content": "div[role='main']", "title": "title", "code_blocks": "pre code", }, "url_patterns": {"include": [], "exclude": []}, "rate_limit": DEFAULT_RATE_LIMIT, "max_pages": DEFAULT_MAX_PAGES, } # Apply CLI overrides for rate limiting if args.no_rate_limit: config["rate_limit"] = 0 logger.info("⚑ Rate limiting disabled") elif args.rate_limit is not None: config["rate_limit"] = args.rate_limit if args.rate_limit == 0: logger.info("⚑ Rate limiting disabled") else: logger.info("⚑ Rate limit override: %ss per page", args.rate_limit) # Apply CLI overrides for worker count if args.workers: # Validate workers count if args.workers < 1: logger.error("❌ Error: --workers must be at least 1 (got %d)", args.workers) logger.error(" Suggestion: Use --workers 1 (default) or omit the flag") sys.exit(1) if args.workers > 10: logger.warning("⚠️ Warning: --workers capped at 10 (requested %d)", args.workers) args.workers = 10 config["workers"] = args.workers if args.workers > 1: logger.info("πŸš€ Parallel scraping enabled: %d workers", args.workers) # Apply CLI override for async mode if args.async_mode: config["async_mode"] = True if config.get("workers", 1) > 1: logger.info("⚑ Async mode enabled (2-3x faster than threads)") else: logger.warning( "⚠️ Async mode enabled but workers=1. Consider using --workers 4 for better performance" ) # Apply CLI override for max_pages if args.max_pages is not None: old_max = config.get("max_pages", DEFAULT_MAX_PAGES) config["max_pages"] = args.max_pages # Warnings for --max-pages usage if args.max_pages > 1000: logger.warning( "⚠️ --max-pages=%d is very high - scraping may take hours", args.max_pages ) logger.warning(" Recommendation: Use configs with reasonable limits for production") elif args.max_pages < 10: logger.warning( "⚠️ --max-pages=%d is very low - may result in incomplete skill", args.max_pages ) if old_max and old_max != args.max_pages: logger.info( "πŸ“Š Max pages override: %d β†’ %d (from --max-pages flag)", old_max, args.max_pages ) else: logger.info("πŸ“Š Max pages set to: %d (from --max-pages flag)", args.max_pages) return config def execute_scraping_and_building( config: dict[str, Any], args: argparse.Namespace ) -> Optional["DocToSkillConverter"]: """Execute the scraping and skill building process. Handles dry run mode, existing data checks, scraping with checkpoints, keyboard interrupts, and skill building. This is the core workflow orchestration for the scraping phase. Args: config (dict): Configuration dictionary with scraping parameters args: Parsed command-line arguments Returns: DocToSkillConverter: The converter instance after scraping/building, or None if process was aborted Example: >>> config = {'name': 'react', 'base_url': 'https://react.dev'} >>> converter = execute_scraping_and_building(config, args) >>> if converter: ... print("Scraping complete!") """ # Dry run mode - preview only if args.dry_run: logger.info("\n" + "=" * 60) logger.info("DRY RUN MODE") logger.info("=" * 60) logger.info("This will show what would be scraped without saving anything.\n") converter = DocToSkillConverter(config, dry_run=True) converter.scrape_all() logger.info("\nπŸ“‹ Configuration Summary:") logger.info(" Name: %s", config["name"]) logger.info(" Base URL: %s", config["base_url"]) logger.info(" Max pages: %d", config.get("max_pages", DEFAULT_MAX_PAGES)) logger.info(" Rate limit: %ss", config.get("rate_limit", DEFAULT_RATE_LIMIT)) logger.info(" Categories: %d", len(config.get("categories", {}))) return None # Check for existing data exists, page_count = check_existing_data(config["name"]) if exists and not args.skip_scrape and not args.fresh: # Check force_rescrape flag from config if config.get("force_rescrape", False): # Auto-delete cached data and rescrape logger.info("\nβœ“ Found existing data: %d pages", page_count) logger.info(" force_rescrape enabled - deleting cached data and rescaping") import shutil data_dir = f"output/{config['name']}_data" if os.path.exists(data_dir): shutil.rmtree(data_dir) logger.info(f" Deleted: {data_dir}") else: # Only prompt if force_rescrape is False logger.info("\nβœ“ Found existing data: %d pages", page_count) response = input("Use existing data? (y/n): ").strip().lower() if response == "y": args.skip_scrape = True elif exists and args.fresh: logger.info("\nβœ“ Found existing data: %d pages", page_count) logger.info(" --fresh flag set, will re-scrape from scratch") # Create converter converter = DocToSkillConverter(config, resume=args.resume) # Handle fresh start (clear checkpoint) if args.fresh: converter.clear_checkpoint() # Scrape or skip if not args.skip_scrape: try: converter.scrape_all() # Save final checkpoint if converter.checkpoint_enabled: converter.save_checkpoint() logger.info("\nπŸ’Ύ Final checkpoint saved") # Clear checkpoint after successful completion converter.clear_checkpoint() logger.info("βœ… Scraping complete - checkpoint cleared") except KeyboardInterrupt: logger.warning("\n\nScraping interrupted.") if converter.checkpoint_enabled: converter.save_checkpoint() logger.info("πŸ’Ύ Progress saved to checkpoint") logger.info( " Resume with: --config %s --resume", args.config if args.config else "config.json", ) response = input("Continue with skill building? (y/n): ").strip().lower() if response != "y": return None else: logger.info("\n⏭️ Skipping scrape, using existing data") # Build skill success = converter.build_skill() if not success: sys.exit(1) # RAG chunking (optional - NEW v2.10.0) if args.chunk_for_rag: logger.info("\n" + "=" * 60) logger.info("πŸ”ͺ Generating RAG chunks...") logger.info("=" * 60) from skill_seekers.cli.rag_chunker import RAGChunker chunker = RAGChunker( chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap, preserve_code_blocks=not args.no_preserve_code_blocks, preserve_paragraphs=not args.no_preserve_paragraphs, ) # Chunk the skill chunks = chunker.chunk_skill(converter.output_dir) # Save chunks chunks_path = converter.output_dir / "rag_chunks.json" chunker.save_chunks(chunks, chunks_path) logger.info(f"βœ… Generated {len(chunks)} RAG chunks") logger.info(f"πŸ“„ Saved to: {chunks_path}") logger.info(f"πŸ’‘ Use with LangChain: --target langchain") logger.info(f"πŸ’‘ Use with LlamaIndex: --target llama-index") return converter def execute_enhancement(config: dict[str, Any], args: argparse.Namespace) -> None: """Execute optional SKILL.md enhancement with Claude. Supports two enhancement modes: 1. API-based enhancement (requires ANTHROPIC_API_KEY) 2. Local enhancement using Claude Code (no API key needed) Prints appropriate messages and suggestions based on whether enhancement was requested and whether it succeeded. Args: config (dict): Configuration dictionary with skill name args: Parsed command-line arguments with enhancement flags Example: >>> execute_enhancement(config, args) # Runs enhancement if --enhance or --enhance-local flag is set """ import subprocess # Optional enhancement with auto-detected mode (API or LOCAL) if getattr(args, "enhance_level", 0) > 0: import os has_api_key = bool(os.environ.get("ANTHROPIC_API_KEY") or args.api_key) mode = "API" if has_api_key else "LOCAL" logger.info("\n" + "=" * 60) logger.info(f"ENHANCING SKILL.MD WITH CLAUDE ({mode} mode, level {args.enhance_level})") logger.info("=" * 60 + "\n") try: enhance_cmd = ["skill-seekers-enhance", f"output/{config['name']}/"] enhance_cmd.extend(["--enhance-level", str(args.enhance_level)]) if args.api_key: enhance_cmd.extend(["--api-key", args.api_key]) if getattr(args, "interactive_enhancement", False): enhance_cmd.append("--interactive-enhancement") result = subprocess.run(enhance_cmd, check=True) if result.returncode == 0: logger.info("\nβœ… Enhancement complete!") except subprocess.CalledProcessError: logger.warning("\n⚠ Enhancement failed, but skill was still built") except FileNotFoundError: logger.warning("\n⚠ skill-seekers-enhance command not found. Run manually:") logger.info( " skill-seekers-enhance output/%s/ --enhance-level %d", config["name"], args.enhance_level, ) # Print packaging instructions logger.info("\nπŸ“¦ Package your skill:") logger.info(" skill-seekers-package output/%s/", config["name"]) # Suggest enhancement if not done if getattr(args, "enhance_level", 0) == 0: logger.info("\nπŸ’‘ Optional: Enhance SKILL.md with Claude:") logger.info(" skill-seekers-enhance output/%s/ --enhance-level 2", config["name"]) logger.info(" or re-run with: --enhance-level 2 (auto-detects API vs LOCAL mode)") logger.info( " API-based: skill-seekers-enhance-api output/%s/", config["name"], ) logger.info(" or re-run with: --enhance") logger.info( "\nπŸ’‘ Tip: Use --interactive-enhancement with --enhance-local to open terminal window" ) def main() -> None: parser = setup_argument_parser() args = parser.parse_args() # Setup logging based on verbosity flags setup_logging(verbose=args.verbose, quiet=args.quiet) config = get_configuration(args) # Execute scraping and building converter = execute_scraping_and_building(config, args) # Exit if dry run or aborted if converter is None: return # Execute enhancement and print instructions execute_enhancement(config, args) if __name__ == "__main__": main()