or tags
+ 5. Text content from paragraphs
+
+ Args:
+ html_content: Raw HTML content string
+ url: Source URL (for reference in result dict)
+
+ Returns:
+ Dict with keys:
+ - url: str - Source URL
+ - title: str - From tag, cleaned
+ - content: str - Text content from main area
+ - headings: List[Dict] - {'level': 'h2', 'text': str, 'id': str}
+ - code_samples: List[Dict] - {'code': str, 'language': str}
+ - links: List - Empty (HTML links not extracted to avoid client-side routes)
+ - patterns: List - Empty (reserved for future use)
+
+ Note:
+ Prefers or tags for content area.
+ Falls back to if no semantic content container found.
+ Language detection uses detect_language() method.
+ """
+ page = {
+ 'url': url,
+ 'title': '',
+ 'content': '',
+ 'headings': [],
+ 'code_samples': [],
+ 'patterns': [],
+ 'links': []
+ }
+
+ soup = BeautifulSoup(html_content, 'html.parser')
+
+ # Try to extract title
+ title_elem = soup.select_one('title')
+ if title_elem:
+ page['title'] = self.clean_text(title_elem.get_text())
+
+ # Try to find main content area
+ main = soup.select_one('main, article, [role="main"], .content')
+ if not main:
+ main = soup.body if soup.body else soup
+
+ if main:
+ # Extract headings
+ for h in main.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']):
+ text = self.clean_text(h.get_text())
+ if text:
+ page['headings'].append({
+ 'level': h.name,
+ 'text': text,
+ 'id': h.get('id', '')
+ })
+
+ # Extract code blocks
+ for code_elem in main.select('pre code, pre'):
+ code = code_elem.get_text()
+ if len(code.strip()) > 10:
+ lang = self.detect_language(code_elem, code)
+ page['code_samples'].append({
+ 'code': code.strip(),
+ 'language': lang
+ })
+
+ # Extract paragraphs
+ paragraphs = []
+ for p in main.find_all('p'):
+ text = self.clean_text(p.get_text())
+ if text and len(text) > 20:
+ paragraphs.append(text)
+ page['content'] = '\n\n'.join(paragraphs)
+
+ return page
+
def detect_language(self, elem, code):
"""Detect programming language from code block
@@ -386,14 +573,19 @@ class DocToSkillConverter:
return text.strip()
def save_page(self, page: Dict[str, Any]) -> None:
- """Save page data"""
+ """Save page data (skip pages with empty content)"""
+ # Skip pages with empty or very short content
+ if not page.get('content') or len(page.get('content', '')) < 50:
+ logger.debug("Skipping page with empty/short content: %s", page.get('url', 'unknown'))
+ return
+
url_hash = hashlib.md5(page['url'].encode()).hexdigest()[:10]
safe_title = re.sub(r'[^\w\s-]', '', page['title'])[:50]
safe_title = re.sub(r'[-\s]+', '_', safe_title)
-
+
filename = f"{safe_title}_{url_hash}.json"
filepath = os.path.join(self.data_dir, "pages", filename)
-
+
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(page, f, indent=2, ensure_ascii=False)
@@ -408,6 +600,7 @@ class DocToSkillConverter:
Note:
Uses threading locks when workers > 1 for thread safety
+ Supports both HTML pages and Markdown (.md) files
"""
try:
# Scraping part (no lock needed - independent)
@@ -415,8 +608,12 @@ class DocToSkillConverter:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
- soup = BeautifulSoup(response.content, 'html.parser')
- page = self.extract_content(soup, url)
+ # Check if this is a Markdown file
+ if url.endswith('.md') or '.md' in url:
+ page = self._extract_markdown_content(response.text, url)
+ else:
+ soup = BeautifulSoup(response.content, 'html.parser')
+ page = self.extract_content(soup, url)
# Thread-safe operations (lock required)
if self.workers > 1:
@@ -463,6 +660,7 @@ class DocToSkillConverter:
Note:
Uses asyncio.Lock for async-safe operations instead of threading.Lock
+ Supports both HTML pages and Markdown (.md) files
"""
async with semaphore: # Limit concurrent requests
try:
@@ -471,9 +669,13 @@ class DocToSkillConverter:
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
- # BeautifulSoup parsing (still synchronous, but fast)
- soup = BeautifulSoup(response.content, 'html.parser')
- page = self.extract_content(soup, url)
+ # Check if this is a Markdown file
+ if url.endswith('.md') or '.md' in url:
+ page = self._extract_markdown_content(response.text, url)
+ else:
+ # BeautifulSoup parsing (still synchronous, but fast)
+ soup = BeautifulSoup(response.content, 'html.parser')
+ page = self.extract_content(soup, url)
# Async-safe operations (no lock needed - single event loop)
logger.info(" %s", url)
@@ -493,6 +695,56 @@ class DocToSkillConverter:
except Exception as e:
logger.error(" ✗ Error scraping %s: %s: %s", url, type(e).__name__, e)
+ def _convert_to_md_urls(self, urls: List[str]) -> List[str]:
+ """
+ Convert URLs to .md format, trying /index.html.md suffix for non-.md URLs.
+ 不预先检查 URL 是否存在,直接加入队列,在爬取时再验证。
+
+ Args:
+ urls: List of URLs to process
+
+ Returns:
+ List of .md URLs (未验证)
+ """
+ md_urls = []
+
+ for url in urls:
+ if '.md' in url:
+ md_urls.append(url)
+ else:
+ # 直接转换为 .md 格式,不发送 HEAD 请求检查
+ url = url.rstrip('/')
+ md_url = f"{url}/index.html.md"
+ md_urls.append(md_url)
+
+ logger.info(" ✓ Converted %d URLs to .md format (will validate during crawl)", len(md_urls))
+ return md_urls
+
+ # ORIGINAL _convert_to_md_urls (with HEAD request validation):
+ # def _convert_to_md_urls(self, urls: List[str]) -> List[str]:
+ # md_urls = []
+ # non_md_urls = []
+ # for url in urls:
+ # if '.md' in url:
+ # md_urls.append(url)
+ # else:
+ # non_md_urls.append(url)
+ # if non_md_urls:
+ # logger.info(" 🔄 Trying to convert %d non-.md URLs to .md format...", len(non_md_urls))
+ # converted = 0
+ # for url in non_md_urls:
+ # url = url.rstrip('/')
+ # md_url = f"{url}/index.html.md"
+ # try:
+ # resp = requests.head(md_url, timeout=5, allow_redirects=True)
+ # if resp.status_code == 200:
+ # md_urls.append(md_url)
+ # converted += 1
+ # except Exception:
+ # pass
+ # logger.info(" ✓ Converted %d URLs to .md format", converted)
+ # return md_urls
+
def _try_llms_txt(self) -> bool:
"""
Try to use llms.txt instead of HTML scraping.
@@ -548,7 +800,29 @@ class DocToSkillConverter:
logger.info(" ✓ %s (%d chars)", extra_filename, len(extra_content))
# Parse explicit file for skill building
- parser = LlmsTxtParser(content)
+ parser = LlmsTxtParser(content, self.base_url)
+
+ # Extract URLs from llms.txt and add to pending_urls for BFS crawling
+ extracted_urls = parser.extract_urls()
+ if extracted_urls:
+ # Convert non-.md URLs to .md format by trying /index.html.md suffix
+ md_urls = self._convert_to_md_urls(extracted_urls)
+ logger.info("\n🔗 Found %d URLs in llms.txt (%d .md files), starting BFS crawl...",
+ len(extracted_urls), len(md_urls))
+
+ # Filter URLs based on url_patterns config
+ for url in md_urls:
+ if self.is_valid_url(url) and url not in self.visited_urls:
+ self.pending_urls.append(url)
+
+ logger.info(" 📋 %d URLs added to crawl queue after filtering", len(self.pending_urls))
+
+ # Return False to trigger HTML scraping with the populated pending_urls
+ self.llms_txt_detected = True
+ self.llms_txt_variant = 'explicit'
+ return False # Continue with BFS crawling
+
+ # Fallback: if no URLs found, use section-based parsing
pages = parser.parse()
if pages:
@@ -606,7 +880,29 @@ class DocToSkillConverter:
largest = max(downloaded.items(), key=lambda x: x[1]['size'])
logger.info("\n📄 Parsing %s for skill building...", largest[1]['filename'])
- parser = LlmsTxtParser(largest[1]['content'])
+ parser = LlmsTxtParser(largest[1]['content'], self.base_url)
+
+ # Extract URLs from llms.txt and add to pending_urls for BFS crawling
+ extracted_urls = parser.extract_urls()
+ if extracted_urls:
+ # Convert non-.md URLs to .md format by trying /index.html.md suffix
+ md_urls = self._convert_to_md_urls(extracted_urls)
+ logger.info("\n🔗 Found %d URLs in llms.txt (%d .md files), starting BFS crawl...",
+ len(extracted_urls), len(md_urls))
+
+ # Filter URLs based on url_patterns config
+ for url in md_urls:
+ if self.is_valid_url(url) and url not in self.visited_urls:
+ self.pending_urls.append(url)
+
+ logger.info(" 📋 %d URLs added to crawl queue after filtering", len(self.pending_urls))
+
+ # Return False to trigger HTML scraping with the populated pending_urls
+ self.llms_txt_detected = True
+ self.llms_txt_variants = list(downloaded.keys())
+ return False # Continue with BFS crawling
+
+ # Fallback: if no URLs found, use section-based parsing
pages = parser.parse()
if not pages:
diff --git a/src/skill_seekers/cli/llms_txt_downloader.py b/src/skill_seekers/cli/llms_txt_downloader.py
index 1049f86..76ec740 100644
--- a/src/skill_seekers/cli/llms_txt_downloader.py
+++ b/src/skill_seekers/cli/llms_txt_downloader.py
@@ -38,11 +38,24 @@ class LlmsTxtDownloader:
def _is_markdown(self, content: str) -> bool:
"""
- Check if content looks like markdown.
+ Check if content looks like markdown (not HTML).
Returns:
- True if content contains markdown patterns
+ True if content contains markdown patterns and is NOT HTML
"""
+ # First, reject HTML content (common redirect trap)
+ content_start = content.strip()[:500].lower()
+ html_indicators = [
+ '',
+ ' List[str]:
+ """
+ Extract all URLs from the llms.txt content.
+
+ Supports both markdown-style links [text](url) and bare URLs.
+ Resolves relative URLs using base_url if provided.
+ Filters out malformed URLs with invalid anchor patterns.
+
+ Returns:
+ List of unique, cleaned URLs found in the content.
+ Returns empty list if no valid URLs found.
+
+ Note:
+ - Markdown links: [Getting Started](./docs/guide.md)
+ - Bare URLs: https://example.com/api.md
+ - Relative paths resolved with base_url
+ - Invalid anchors (#section/path.md) are stripped
+ """
+ urls = set()
+
+ # Match markdown links: [text](url)
+ md_links = re.findall(r'\[([^\]]*)\]\(([^)]+)\)', self.content)
+ for _, url in md_links:
+ if url.startswith('http'):
+ clean_url = self._clean_url(url)
+ if clean_url:
+ urls.add(clean_url)
+ elif self.base_url and not url.startswith('#'):
+ clean_url = self._clean_url(urljoin(self.base_url, url))
+ if clean_url:
+ urls.add(clean_url)
+
+ # Match bare URLs
+ bare_urls = re.findall(r'https?://[^\s\)\]<>"\']+', self.content)
+ for url in bare_urls:
+ # Clean trailing punctuation
+ url = url.rstrip('.,;:')
+ clean_url = self._clean_url(url)
+ if clean_url:
+ urls.add(clean_url)
+
+ return list(urls)
+
+ def _clean_url(self, url: str) -> str:
+ """
+ Clean and validate URL, removing invalid anchor patterns.
+
+ Detects and strips malformed anchors that contain path separators.
+ Valid: https://example.com/page.md#section
+ Invalid: https://example.com/page#section/index.html.md
+
+ Args:
+ url: URL to clean (absolute or relative)
+
+ Returns:
+ Cleaned URL with malformed anchors stripped.
+ Returns base URL if anchor contains '/' (malformed).
+ Returns original URL if anchor is valid or no anchor present.
+
+ Example:
+ >>> parser._clean_url("https://ex.com/page#sec/path.md")
+ "https://ex.com/page"
+ >>> parser._clean_url("https://ex.com/page.md#section")
+ "https://ex.com/page.md#section"
+ """
+ # Skip URLs with path after anchor (e.g., #section/index.html.md)
+ # These are malformed and return duplicate HTML content
+ if '#' in url:
+ anchor_pos = url.index('#')
+ after_anchor = url[anchor_pos + 1:]
+ # If there's a path separator after anchor, it's invalid
+ if '/' in after_anchor:
+ # Extract the base URL without the malformed anchor
+ return url[:anchor_pos]
+ return url
def parse(self) -> List[Dict]:
"""
diff --git a/src/skill_seekers/cli/unified_scraper.py b/src/skill_seekers/cli/unified_scraper.py
index 24088f3..ed07657 100644
--- a/src/skill_seekers/cli/unified_scraper.py
+++ b/src/skill_seekers/cli/unified_scraper.py
@@ -71,8 +71,15 @@ class UnifiedScraper:
self.merge_mode = merge_mode or self.config.get('merge_mode', 'rule-based')
logger.info(f"Merge mode: {self.merge_mode}")
- # Storage for scraped data
- self.scraped_data = {}
+ # Storage for scraped data - use lists to support multiple sources of same type
+ self.scraped_data = {
+ 'documentation': [], # List of doc sources
+ 'github': [], # List of github sources
+ 'pdf': [] # List of pdf sources
+ }
+
+ # Track source index for unique naming (multi-source support)
+ self._source_counters = {'documentation': 0, 'github': 0, 'pdf': 0}
# Output paths - cleaner organization
self.name = self.config['name']
@@ -240,19 +247,20 @@ class UnifiedScraper:
shutil.move(docs_data_dir, cache_data_dir)
logger.info(f"📦 Moved docs data to cache: {cache_data_dir}")
- def _clone_github_repo(self, repo_name: str) -> Optional[str]:
+ def _clone_github_repo(self, repo_name: str, idx: int = 0) -> Optional[str]:
"""
Clone GitHub repository to cache directory for C3.x analysis.
Reuses existing clone if already present.
Args:
repo_name: GitHub repo in format "owner/repo"
+ idx: Source index for unique naming when multiple repos
Returns:
Path to cloned repo, or None if clone failed
"""
# Clone to cache repos folder for future reuse
- repo_dir_name = repo_name.replace('/', '_') # e.g., encode_httpx
+ repo_dir_name = f"{idx}_{repo_name.replace('/', '_')}" # e.g., 0_encode_httpx
clone_path = os.path.join(self.repos_dir, repo_dir_name)
# Check if already cloned
@@ -307,6 +315,14 @@ class UnifiedScraper:
logger.error("github_scraper.py not found")
return
+ # Multi-source support: Get unique index for this GitHub source
+ idx = self._source_counters['github']
+ self._source_counters['github'] += 1
+
+ # Extract repo identifier for unique naming
+ repo = source['repo']
+ repo_id = repo.replace('/', '_')
+
# Check if we need to clone for C3.x analysis
enable_codebase_analysis = source.get('enable_codebase_analysis', True)
local_repo_path = source.get('local_repo_path')
@@ -315,7 +331,7 @@ class UnifiedScraper:
# Auto-clone if C3.x analysis is enabled but no local path provided
if enable_codebase_analysis and not local_repo_path:
logger.info("🔬 C3.x codebase analysis enabled - cloning repository...")
- cloned_repo_path = self._clone_github_repo(source['repo'])
+ cloned_repo_path = self._clone_github_repo(repo, idx=idx)
if cloned_repo_path:
local_repo_path = cloned_repo_path
logger.info(f"✅ Using cloned repo for C3.x analysis: {local_repo_path}")
@@ -325,8 +341,8 @@ class UnifiedScraper:
# Create config for GitHub scraper
github_config = {
- 'repo': source['repo'],
- 'name': f"{self.name}_github",
+ 'repo': repo,
+ 'name': f"{self.name}_github_{idx}_{repo_id}",
'github_token': source.get('github_token'),
'include_issues': source.get('include_issues', True),
'max_issues': source.get('max_issues', 100),
@@ -369,8 +385,8 @@ class UnifiedScraper:
if cloned_repo_path:
logger.info(f"📁 Repository clone saved for future use: {cloned_repo_path}")
- # Save data to unified location
- github_data_file = os.path.join(self.data_dir, 'github_data.json')
+ # Save data to unified location with unique filename
+ github_data_file = os.path.join(self.data_dir, f'github_data_{idx}_{repo_id}.json')
with open(github_data_file, 'w', encoding='utf-8') as f:
json.dump(github_data, f, indent=2, ensure_ascii=False)
@@ -379,10 +395,14 @@ class UnifiedScraper:
with open(converter_data_file, 'w', encoding='utf-8') as f:
json.dump(github_data, f, indent=2, ensure_ascii=False)
- self.scraped_data['github'] = {
+ # Append to list instead of overwriting (multi-source support)
+ self.scraped_data['github'].append({
+ 'repo': repo,
+ 'repo_id': repo_id,
+ 'idx': idx,
'data': github_data,
'data_file': github_data_file
- }
+ })
# Build standalone SKILL.md for synthesis using GitHubToSkillConverter
try:
@@ -423,9 +443,17 @@ class UnifiedScraper:
logger.error("pdf_scraper.py not found")
return
+ # Multi-source support: Get unique index for this PDF source
+ idx = self._source_counters['pdf']
+ self._source_counters['pdf'] += 1
+
+ # Extract PDF identifier for unique naming (filename without extension)
+ pdf_path = source['path']
+ pdf_id = os.path.splitext(os.path.basename(pdf_path))[0]
+
# Create config for PDF scraper
pdf_config = {
- 'name': f"{self.name}_pdf",
+ 'name': f"{self.name}_pdf_{idx}_{pdf_id}",
'pdf': source['path'],
'extract_tables': source.get('extract_tables', False),
'ocr': source.get('ocr', False),
@@ -438,14 +466,18 @@ class UnifiedScraper:
pdf_data = converter.extract_all()
# Save data
- pdf_data_file = os.path.join(self.data_dir, 'pdf_data.json')
+ pdf_data_file = os.path.join(self.data_dir, f'pdf_data_{idx}_{pdf_id}.json')
with open(pdf_data_file, 'w', encoding='utf-8') as f:
json.dump(pdf_data, f, indent=2, ensure_ascii=False)
- self.scraped_data['pdf'] = {
+ # Append to list instead of overwriting
+ self.scraped_data['pdf'].append({
+ 'pdf_path': pdf_path,
+ 'pdf_id': pdf_id,
+ 'idx': idx,
'data': pdf_data,
'data_file': pdf_data_file
- }
+ })
# Build standalone SKILL.md for synthesis
try:
diff --git a/src/skill_seekers/cli/unified_skill_builder.py b/src/skill_seekers/cli/unified_skill_builder.py
index 70dd6fa..58d8478 100644
--- a/src/skill_seekers/cli/unified_skill_builder.py
+++ b/src/skill_seekers/cli/unified_skill_builder.py
@@ -97,23 +97,39 @@ class UnifiedSkillBuilder:
except IOError as e:
logger.warning(f"Failed to read documentation SKILL.md: {e}")
- # Load GitHub SKILL.md
- github_skill_path = sources_dir / f"{self.name}_github" / "SKILL.md"
- if github_skill_path.exists():
- try:
- skill_mds['github'] = github_skill_path.read_text(encoding='utf-8')
- logger.debug(f"Loaded GitHub SKILL.md ({len(skill_mds['github'])} chars)")
- except IOError as e:
- logger.warning(f"Failed to read GitHub SKILL.md: {e}")
+ # Load ALL GitHub sources (multi-source support)
+ github_sources = []
+ for github_dir in sources_dir.glob(f"{self.name}_github_*"):
+ github_skill_path = github_dir / "SKILL.md"
+ if github_skill_path.exists():
+ try:
+ content = github_skill_path.read_text(encoding='utf-8')
+ github_sources.append(content)
+ logger.debug(f"Loaded GitHub SKILL.md from {github_dir.name} ({len(content)} chars)")
+ except IOError as e:
+ logger.warning(f"Failed to read GitHub SKILL.md from {github_dir.name}: {e}")
- # Load PDF SKILL.md
- pdf_skill_path = sources_dir / f"{self.name}_pdf" / "SKILL.md"
- if pdf_skill_path.exists():
- try:
- skill_mds['pdf'] = pdf_skill_path.read_text(encoding='utf-8')
- logger.debug(f"Loaded PDF SKILL.md ({len(skill_mds['pdf'])} chars)")
- except IOError as e:
- logger.warning(f"Failed to read PDF SKILL.md: {e}")
+ if github_sources:
+ # Concatenate all GitHub sources with separator
+ skill_mds['github'] = '\n\n---\n\n'.join(github_sources)
+ logger.debug(f"Combined {len(github_sources)} GitHub SKILL.md files")
+
+ # Load ALL PDF sources (multi-source support)
+ pdf_sources = []
+ for pdf_dir in sources_dir.glob(f"{self.name}_pdf_*"):
+ pdf_skill_path = pdf_dir / "SKILL.md"
+ if pdf_skill_path.exists():
+ try:
+ content = pdf_skill_path.read_text(encoding='utf-8')
+ pdf_sources.append(content)
+ logger.debug(f"Loaded PDF SKILL.md from {pdf_dir.name} ({len(content)} chars)")
+ except IOError as e:
+ logger.warning(f"Failed to read PDF SKILL.md from {pdf_dir.name}: {e}")
+
+ if pdf_sources:
+ # Concatenate all PDF sources with separator
+ skill_mds['pdf'] = '\n\n---\n\n'.join(pdf_sources)
+ logger.debug(f"Combined {len(pdf_sources)} PDF SKILL.md files")
logger.info(f"Loaded {len(skill_mds)} source SKILL.md files")
return skill_mds
@@ -731,123 +747,197 @@ This skill combines knowledge from multiple sources:
"""Generate reference files organized by source."""
logger.info("Generating reference files...")
- # Generate references for each source type
- if 'documentation' in self.scraped_data:
- self._generate_docs_references()
+ # Generate references for each source type (now lists)
+ docs_list = self.scraped_data.get('documentation', [])
+ if docs_list:
+ self._generate_docs_references(docs_list)
- if 'github' in self.scraped_data:
- self._generate_github_references()
+ github_list = self.scraped_data.get('github', [])
+ if github_list:
+ self._generate_github_references(github_list)
- if 'pdf' in self.scraped_data:
- self._generate_pdf_references()
+ pdf_list = self.scraped_data.get('pdf', [])
+ if pdf_list:
+ self._generate_pdf_references(pdf_list)
# Generate merged API reference if available
if self.merged_data:
self._generate_merged_api_reference()
- # Generate C3.x codebase analysis references if available
- github_data = self.scraped_data.get('github', {}).get('data', {})
- if github_data.get('c3_analysis'):
- self._generate_c3_analysis_references()
+ # Generate C3.x codebase analysis references if available (multi-source)
+ github_list = self.scraped_data.get('github', [])
+ for github_source in github_list:
+ github_data = github_source.get('data', {})
+ if github_data.get('c3_analysis'):
+ repo_id = github_source.get('repo_id', 'unknown')
+ self._generate_c3_analysis_references(repo_id=repo_id)
+
+ def _generate_docs_references(self, docs_list: List[Dict]):
+ """Generate references from multiple documentation sources."""
+ # Skip if no documentation sources
+ if not docs_list:
+ return
- def _generate_docs_references(self):
- """Generate references from documentation source."""
docs_dir = os.path.join(self.skill_dir, 'references', 'documentation')
os.makedirs(docs_dir, exist_ok=True)
- # Best-effort: copy docs-only reference files into unified docs references.
- # UnifiedScraper runs doc_scraper using name "{name}_docs", which creates
- # output/{name}_docs/references/*.md. Those are the most useful documentation
- # references for the unified skill.
- source_refs_dir = os.path.join('output', f"{self.name}_docs", 'references')
- copied_files: List[str] = []
+ all_copied_files: List[str] = []
- if os.path.isdir(source_refs_dir):
- for entry in sorted(os.listdir(source_refs_dir)):
- src_path = os.path.join(source_refs_dir, entry)
- dst_path = os.path.join(docs_dir, entry)
- if not os.path.isfile(src_path):
- continue
- shutil.copy2(src_path, dst_path)
- copied_files.append(entry)
+ # Process each documentation source
+ for i, doc_source in enumerate(docs_list):
+ source_id = doc_source.get('source_id', f'source_{i}')
+ base_url = doc_source.get('base_url', 'Unknown')
+ refs_dir = doc_source.get('refs_dir', '')
- # Create index
+ # Create subdirectory for this source
+ source_dir = os.path.join(docs_dir, source_id)
+ os.makedirs(source_dir, exist_ok=True)
+
+ copied_files: List[str] = []
+
+ if refs_dir and os.path.isdir(refs_dir):
+ for entry in sorted(os.listdir(refs_dir)):
+ src_path = os.path.join(refs_dir, entry)
+ dst_path = os.path.join(source_dir, entry)
+ if not os.path.isfile(src_path):
+ continue
+ shutil.copy2(src_path, dst_path)
+ copied_files.append(entry)
+
+ # Create index for this source
+ source_index_path = os.path.join(source_dir, 'index.md')
+ with open(source_index_path, 'w', encoding='utf-8') as f:
+ f.write(f"# Documentation: {source_id}\n\n")
+ f.write(f"**Source**: {base_url}\n\n")
+ f.write(f"**Pages**: {doc_source.get('total_pages', 'N/A')}\n\n")
+
+ if copied_files:
+ files_no_index = [p for p in copied_files if p.lower() != 'index.md']
+ f.write("## Files\n\n")
+ for filename in files_no_index:
+ f.write(f"- [{filename}]({filename})\n")
+ else:
+ f.write("No reference files available.\n")
+
+ all_copied_files.extend(copied_files)
+
+ # Create main index
index_path = os.path.join(docs_dir, 'index.md')
with open(index_path, 'w', encoding='utf-8') as f:
- f.write("# Documentation\n\n")
- f.write("Reference from official documentation.\n\n")
+ f.write("# Documentation References\n\n")
+ f.write(f"Combined from {len(docs_list)} documentation sources.\n\n")
- if copied_files:
- files_no_index = [p for p in copied_files if p.lower() != 'index.md']
- files_index = [p for p in copied_files if p.lower() == 'index.md']
+ f.write("## Sources\n\n")
+ for doc_source in docs_list:
+ source_id = doc_source.get('source_id', 'unknown')
+ base_url = doc_source.get('base_url', 'Unknown')
+ total_pages = doc_source.get('total_pages', 'N/A')
+ f.write(f"- [{source_id}]({source_id}/index.md) - {base_url} ({total_pages} pages)\n")
- f.write("## Files\n\n")
- for filename in files_no_index + files_index:
- f.write(f"- [{filename}]({filename})\n")
- else:
- f.write("## Notes\n\n")
- f.write(
- "No documentation reference files were copied into this unified skill. "
- "This usually means the docs-only build did not produce reference files.\n"
- )
+ logger.info(f"Created documentation references ({len(docs_list)} sources)")
- logger.info("Created documentation references")
+ def _generate_github_references(self, github_list: List[Dict]):
+ """Generate references from multiple GitHub sources."""
+ # Skip if no GitHub sources
+ if not github_list:
+ return
- def _generate_github_references(self):
- """Generate references from GitHub source."""
github_dir = os.path.join(self.skill_dir, 'references', 'github')
os.makedirs(github_dir, exist_ok=True)
- github_data = self.scraped_data['github']['data']
+ # Process each GitHub source
+ for i, github_source in enumerate(github_list):
+ repo = github_source.get('repo', f'repo_{i}')
+ repo_id = github_source.get('repo_id', repo.replace('/', '_'))
+ github_data = github_source.get('data', {})
- # Create README reference
- if github_data.get('readme'):
- readme_path = os.path.join(github_dir, 'README.md')
- with open(readme_path, 'w') as f:
- f.write("# Repository README\n\n")
- f.write(github_data['readme'])
+ # Create subdirectory for this repo
+ repo_dir = os.path.join(github_dir, repo_id)
+ os.makedirs(repo_dir, exist_ok=True)
- # Create issues reference
- if github_data.get('issues'):
- issues_path = os.path.join(github_dir, 'issues.md')
- with open(issues_path, 'w') as f:
- f.write("# GitHub Issues\n\n")
- f.write(f"{len(github_data['issues'])} recent issues.\n\n")
+ # Create README reference
+ if github_data.get('readme'):
+ readme_path = os.path.join(repo_dir, 'README.md')
+ with open(readme_path, 'w', encoding='utf-8') as f:
+ f.write(f"# Repository README: {repo}\n\n")
+ f.write(github_data['readme'])
- for issue in github_data['issues'][:20]:
- f.write(f"## #{issue['number']}: {issue['title']}\n\n")
- f.write(f"**State**: {issue['state']}\n")
- if issue.get('labels'):
- f.write(f"**Labels**: {', '.join(issue['labels'])}\n")
- f.write(f"**URL**: {issue.get('url', 'N/A')}\n\n")
+ # Create issues reference
+ if github_data.get('issues'):
+ issues_path = os.path.join(repo_dir, 'issues.md')
+ with open(issues_path, 'w', encoding='utf-8') as f:
+ f.write(f"# GitHub Issues: {repo}\n\n")
+ f.write(f"{len(github_data['issues'])} recent issues.\n\n")
- # Create releases reference
- if github_data.get('releases'):
- releases_path = os.path.join(github_dir, 'releases.md')
- with open(releases_path, 'w') as f:
- f.write("# Releases\n\n")
+ for issue in github_data['issues'][:20]:
+ f.write(f"## #{issue['number']}: {issue['title']}\n\n")
+ f.write(f"**State**: {issue['state']}\n")
+ if issue.get('labels'):
+ f.write(f"**Labels**: {', '.join(issue['labels'])}\n")
+ f.write(f"**URL**: {issue.get('url', 'N/A')}\n\n")
- for release in github_data['releases'][:10]:
- f.write(f"## {release['tag_name']}: {release.get('name', 'N/A')}\n\n")
- f.write(f"**Published**: {release.get('published_at', 'N/A')[:10]}\n\n")
- if release.get('body'):
- f.write(release['body'][:500])
- f.write("\n\n")
+ # Create releases reference
+ if github_data.get('releases'):
+ releases_path = os.path.join(repo_dir, 'releases.md')
+ with open(releases_path, 'w', encoding='utf-8') as f:
+ f.write(f"# Releases: {repo}\n\n")
- logger.info("Created GitHub references")
+ for release in github_data['releases'][:10]:
+ f.write(f"## {release['tag_name']}: {release.get('name', 'N/A')}\n\n")
+ f.write(f"**Published**: {release.get('published_at', 'N/A')[:10]}\n\n")
+ if release.get('body'):
+ f.write(release['body'][:500])
+ f.write("\n\n")
+
+ # Create index for this repo
+ repo_index_path = os.path.join(repo_dir, 'index.md')
+ repo_info = github_data.get('repo_info', {})
+ with open(repo_index_path, 'w', encoding='utf-8') as f:
+ f.write(f"# GitHub: {repo}\n\n")
+ f.write(f"**Stars**: {repo_info.get('stars', 'N/A')}\n")
+ f.write(f"**Language**: {repo_info.get('language', 'N/A')}\n")
+ f.write(f"**Issues**: {len(github_data.get('issues', []))}\n")
+ f.write(f"**Releases**: {len(github_data.get('releases', []))}\n\n")
+ f.write("## Files\n\n")
+ f.write("- [README.md](README.md)\n")
+ if github_data.get('issues'):
+ f.write("- [issues.md](issues.md)\n")
+ if github_data.get('releases'):
+ f.write("- [releases.md](releases.md)\n")
+
+ # Create main index
+ index_path = os.path.join(github_dir, 'index.md')
+ with open(index_path, 'w', encoding='utf-8') as f:
+ f.write("# GitHub References\n\n")
+ f.write(f"Combined from {len(github_list)} GitHub repositories.\n\n")
+
+ f.write("## Repositories\n\n")
+ for github_source in github_list:
+ repo = github_source.get('repo', 'unknown')
+ repo_id = github_source.get('repo_id', repo.replace('/', '_'))
+ github_data = github_source.get('data', {})
+ repo_info = github_data.get('repo_info', {})
+ stars = repo_info.get('stars', 'N/A')
+ f.write(f"- [{repo}]({repo_id}/index.md) - {stars} stars\n")
+
+ logger.info(f"Created GitHub references ({len(github_list)} repos)")
+
+ def _generate_pdf_references(self, pdf_list: List[Dict]):
+ """Generate references from PDF sources."""
+ # Skip if no PDF sources
+ if not pdf_list:
+ return
- def _generate_pdf_references(self):
- """Generate references from PDF source."""
pdf_dir = os.path.join(self.skill_dir, 'references', 'pdf')
os.makedirs(pdf_dir, exist_ok=True)
# Create index
index_path = os.path.join(pdf_dir, 'index.md')
- with open(index_path, 'w') as f:
+ with open(index_path, 'w', encoding='utf-8') as f:
f.write("# PDF Documentation\n\n")
- f.write("Reference from PDF document.\n\n")
+ f.write(f"Reference from {len(pdf_list)} PDF document(s).\n\n")
- logger.info("Created PDF references")
+ logger.info(f"Created PDF references ({len(pdf_list)} sources)")
def _generate_merged_api_reference(self):
"""Generate merged API reference file."""
@@ -869,16 +959,32 @@ This skill combines knowledge from multiple sources:
logger.info(f"Created merged API reference ({len(apis)} APIs)")
- def _generate_c3_analysis_references(self):
- """Generate codebase analysis references (C3.5)."""
- github_data = self.scraped_data.get('github', {}).get('data', {})
+ def _generate_c3_analysis_references(self, repo_id: str = 'github'):
+ """Generate codebase analysis references (C3.5) for a specific GitHub source.
+
+ Args:
+ repo_id: Repository identifier (e.g., 'encode_httpx') for multi-source support
+ """
+ # Find the correct github_source from the list
+ github_list = self.scraped_data.get('github', [])
+ github_source = None
+ for source in github_list:
+ if source.get('repo_id') == repo_id:
+ github_source = source
+ break
+
+ if not github_source:
+ logger.warning(f"GitHub source with repo_id '{repo_id}' not found")
+ return
+
+ github_data = github_source.get('data', {})
c3_data = github_data.get('c3_analysis')
if not c3_data:
return
- # Create main directory
- c3_dir = os.path.join(self.skill_dir, 'references', 'codebase_analysis')
+ # Create unique directory per repo for multi-source support
+ c3_dir = os.path.join(self.skill_dir, 'references', 'codebase_analysis', repo_id)
os.makedirs(c3_dir, exist_ok=True)
logger.info("Generating C3.x codebase analysis references...")
@@ -933,7 +1039,7 @@ This skill combines knowledge from multiple sources:
# If no languages from C3.7, try to get from GitHub data
if not languages:
- github_data = self.scraped_data.get('github', {}).get('data', {})
+ # github_data already available from method scope
if github_data.get('languages'):
# GitHub data has languages as list, convert to dict with count 1
languages = {lang: 1 for lang in github_data['languages']}
diff --git a/tests/test_llms_txt_downloader.py b/tests/test_llms_txt_downloader.py
index 3b945fc..bcdc4dc 100644
--- a/tests/test_llms_txt_downloader.py
+++ b/tests/test_llms_txt_downloader.py
@@ -168,3 +168,95 @@ def test_get_proper_filename_small():
filename = downloader.get_proper_filename()
assert filename == "llms-small.md"
+
+def test_is_markdown_rejects_html_doctype():
+ """Test that HTML with DOCTYPE is rejected (prevents redirect trap)"""
+ downloader = LlmsTxtDownloader("https://example.com/llms.txt")
+
+ html = 'Product Page Content'
+ assert not downloader._is_markdown(html)
+
+ # Test case-insensitive
+ html_uppercase = 'Content'
+ assert not downloader._is_markdown(html_uppercase)
+
+def test_is_markdown_rejects_html_tag():
+ """Test that HTML with tag is rejected (prevents redirect trap)"""
+ downloader = LlmsTxtDownloader("https://example.com/llms.txt")
+
+ html = 'Content'
+ assert not downloader._is_markdown(html)
+
+ # Test with just opening tag
+ html_partial = 'Some content'
+ assert not downloader._is_markdown(html_partial)
+
+def test_is_markdown_rejects_html_meta():
+ """Test that HTML with or tags is rejected"""
+ downloader = LlmsTxtDownloader("https://example.com/llms.txt")
+
+ html_with_head = 'Page Content'
+ assert not downloader._is_markdown(html_with_head)
+
+ html_with_meta = ''
+ assert not downloader._is_markdown(html_with_meta)
+
+def test_is_markdown_accepts_markdown_with_html_words():
+ """Test that markdown mentioning 'html' word is still accepted"""
+ downloader = LlmsTxtDownloader("https://example.com/llms.txt")
+
+ markdown = '# Guide\n\nLearn about html tags in markdown. You can write HTML inside markdown.'
+ assert downloader._is_markdown(markdown)
+
+ # Test with actual markdown patterns
+ markdown_with_code = '# HTML Tutorial\n\n```html\nexample\n```\n\n## More content'
+ assert downloader._is_markdown(markdown_with_code)
+
+def test_html_detection_only_scans_first_500_chars():
+ """Test that HTML detection only scans first 500 characters for performance"""
+ downloader = LlmsTxtDownloader("https://example.com/llms.txt")
+
+ # HTML tag after 500 chars should not be detected
+ safe_markdown = '# Header\n\n' + ('Valid markdown content. ' * 50) + '\n\n'
+ # This should pass because is beyond first 500 chars
+ if len(safe_markdown[:500]) < len(''):
+ # If the HTML is within 500 chars, adjust test
+ assert not downloader._is_markdown(safe_markdown)
+ else:
+ # HTML beyond 500 chars should not trigger rejection
+ assert downloader._is_markdown(safe_markdown)
+
+def test_html_redirect_trap_scenario():
+ """Test real-world scenario: llms.txt redirects to HTML product page"""
+ downloader = LlmsTxtDownloader("https://example.com/llms.txt")
+
+ # Simulate Claude Code redirect scenario (302 to HTML page)
+ html_product_page = '''
+
+
+
+ Claude Code - Product Page
+
+
+ Claude Code
+ Product information...
+
+'''
+
+ # Should reject this HTML even though it has tag (looks like markdown "# ")
+ assert not downloader._is_markdown(html_product_page)
+
+def test_download_rejects_html_redirect():
+ """Test that download() properly rejects HTML redirects"""
+ downloader = LlmsTxtDownloader("https://example.com/llms.txt")
+
+ mock_response = Mock()
+ # Simulate server returning HTML instead of markdown
+ mock_response.text = 'Product Page
'
+ mock_response.raise_for_status = Mock()
+
+ with patch('requests.get', return_value=mock_response):
+ content = downloader.download()
+
+ # Should return None (rejected as non-markdown)
+ assert content is None
diff --git a/tests/test_markdown_parsing.py b/tests/test_markdown_parsing.py
new file mode 100644
index 0000000..9917225
--- /dev/null
+++ b/tests/test_markdown_parsing.py
@@ -0,0 +1,359 @@
+"""
+Tests for Markdown parsing and BFS URL crawling features.
+
+Tests the following functionality:
+1. Markdown file content extraction (_extract_markdown_content)
+2. HTML fallback when .md URL returns HTML (_extract_html_as_markdown)
+3. URL extraction from llms.txt (extract_urls, _clean_url)
+4. Empty/short content filtering in save_page
+"""
+
+import unittest
+import tempfile
+import os
+import shutil
+
+
+class TestMarkdownContentExtraction(unittest.TestCase):
+ """Test Markdown file parsing in doc_scraper."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ from skill_seekers.cli.doc_scraper import DocToSkillConverter
+
+ self.config = {
+ 'name': 'test_md_parsing',
+ 'base_url': 'https://example.com',
+ 'selectors': {},
+ 'url_patterns': {'include': [], 'exclude': []},
+ 'categories': {}
+ }
+ self.converter = DocToSkillConverter(self.config)
+
+ def tearDown(self):
+ """Clean up output directory."""
+ output_dir = f"output/{self.config['name']}_data"
+ if os.path.exists(output_dir):
+ shutil.rmtree(output_dir)
+
+ def test_extract_title_from_h1(self):
+ """Test extracting title from first h1."""
+ content = "# My Documentation Title\n\nSome content here."
+ result = self.converter._extract_markdown_content(content, "https://example.com/test.md")
+ self.assertEqual(result['title'], "My Documentation Title")
+
+ def test_extract_headings_h2_to_h6(self):
+ """Test extracting h2-h6 headings (not h1)."""
+ content = """# Title
+
+## Section One
+### Subsection A
+#### Deep Section
+##### Deeper
+###### Deepest
+
+Content here.
+"""
+ result = self.converter._extract_markdown_content(content, "https://example.com/test.md")
+ # Should have 5 headings (h2-h6), not h1
+ self.assertEqual(len(result['headings']), 5)
+ self.assertEqual(result['headings'][0]['level'], 'h2')
+ self.assertEqual(result['headings'][0]['text'], 'Section One')
+
+ def test_extract_code_blocks_with_language(self):
+ """Test extracting code blocks with language tags."""
+ content = """# API Guide
+
+```python
+def hello():
+ return "Hello, World!"
+```
+
+Some explanation.
+
+```javascript
+const greet = () => console.log("Hi");
+```
+
+```
+plain code without language
+```
+"""
+ result = self.converter._extract_markdown_content(content, "https://example.com/test.md")
+ self.assertEqual(len(result['code_samples']), 3)
+ self.assertEqual(result['code_samples'][0]['language'], 'python')
+ self.assertEqual(result['code_samples'][1]['language'], 'javascript')
+ self.assertEqual(result['code_samples'][2]['language'], 'unknown')
+
+ def test_extract_markdown_links_only_md_files(self):
+ """Test that only .md links are extracted."""
+ content = """# Links
+
+- [Markdown Doc](./guide.md)
+- [Another MD](https://example.com/api.md)
+- [HTML Page](./page.html)
+- [External](https://google.com)
+"""
+ result = self.converter._extract_markdown_content(content, "https://example.com/docs/test.md")
+ # Should only include .md links
+ md_links = [l for l in result['links'] if '.md' in l]
+ self.assertEqual(len(md_links), len(result['links']))
+
+ def test_extract_content_paragraphs(self):
+ """Test extracting paragraph content."""
+ content = """# Title
+
+This is a paragraph with enough content to pass the minimum length filter.
+
+Short.
+
+Another paragraph that should be included in the final content output.
+"""
+ result = self.converter._extract_markdown_content(content, "https://example.com/test.md")
+ self.assertIn("paragraph with enough content", result['content'])
+ self.assertNotIn("Short.", result['content'])
+
+ def test_detect_html_in_md_url(self):
+ """Test that HTML content is detected when .md URL returns HTML."""
+ html_content = "Page Hello
"
+ result = self.converter._extract_markdown_content(html_content, "https://example.com/test.md")
+ self.assertEqual(result['title'], "Page")
+
+
+class TestHtmlAsMarkdownExtraction(unittest.TestCase):
+ """Test HTML to markdown-like extraction."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ from skill_seekers.cli.doc_scraper import DocToSkillConverter
+
+ self.config = {
+ 'name': 'test_html_fallback',
+ 'base_url': 'https://example.com',
+ 'selectors': {},
+ 'url_patterns': {'include': [], 'exclude': []},
+ 'categories': {}
+ }
+ self.converter = DocToSkillConverter(self.config)
+
+ def tearDown(self):
+ """Clean up output directory."""
+ output_dir = f"output/{self.config['name']}_data"
+ if os.path.exists(output_dir):
+ shutil.rmtree(output_dir)
+
+ def test_extract_title_from_html(self):
+ """Test extracting title from HTML title tag."""
+ html = "My Page Title "
+ result = self.converter._extract_html_as_markdown(html, "https://example.com/test.md")
+ self.assertEqual(result['title'], "My Page Title")
+
+ def test_find_main_content_area(self):
+ """Test finding main content from various selectors."""
+ html = """
+
+
+
+ Main Content
+ This is the main content area with enough text to pass filters.
+
+
+
+ """
+ result = self.converter._extract_html_as_markdown(html, "https://example.com/test.md")
+ self.assertIn("main content area", result['content'].lower())
+
+ def test_extract_code_blocks_from_html(self):
+ """Test extracting code blocks from HTML pre/code tags."""
+ html = """
+
+
+ print("hello")
+
+
+ """
+ result = self.converter._extract_html_as_markdown(html, "https://example.com/test.md")
+ self.assertTrue(len(result['code_samples']) > 0)
+
+ def test_fallback_to_body_when_no_main(self):
+ """Test fallback to body when no main/article element."""
+ html = """
+
+
+ Section
+ Content in body without main element, long enough to pass filter.
+
+
+ """
+ result = self.converter._extract_html_as_markdown(html, "https://example.com/test.md")
+ self.assertTrue(len(result['headings']) > 0 or len(result['content']) > 0)
+
+
+class TestLlmsTxtUrlExtraction(unittest.TestCase):
+ """Test URL extraction from llms.txt content."""
+
+ def test_extract_markdown_style_links(self):
+ """Test extracting [text](url) style links."""
+ from skill_seekers.cli.llms_txt_parser import LlmsTxtParser
+
+ content = """
+# Documentation Index
+
+- [Getting Started](https://docs.example.com/start.md)
+- [API Reference](https://docs.example.com/api/index.md)
+- [Advanced Guide](https://docs.example.com/advanced.md)
+"""
+ parser = LlmsTxtParser(content, base_url="https://docs.example.com")
+ urls = parser.extract_urls()
+
+ self.assertIn("https://docs.example.com/start.md", urls)
+ self.assertIn("https://docs.example.com/api/index.md", urls)
+ self.assertIn("https://docs.example.com/advanced.md", urls)
+
+ def test_extract_bare_urls(self):
+ """Test extracting bare URLs without markdown syntax."""
+ from skill_seekers.cli.llms_txt_parser import LlmsTxtParser
+
+ content = """
+Documentation: https://example.com/docs/guide.md
+API: https://example.com/api/reference.md
+"""
+ parser = LlmsTxtParser(content)
+ urls = parser.extract_urls()
+
+ self.assertIn("https://example.com/docs/guide.md", urls)
+ self.assertIn("https://example.com/api/reference.md", urls)
+
+ def test_resolve_relative_urls(self):
+ """Test resolving relative URLs with base_url."""
+ from skill_seekers.cli.llms_txt_parser import LlmsTxtParser
+
+ content = """
+- [Local Doc](./docs/guide.md)
+- [Parent](../api/ref.md)
+"""
+ parser = LlmsTxtParser(content, base_url="https://example.com/learn/")
+ urls = parser.extract_urls()
+
+ # Should resolve relative paths
+ self.assertTrue(any("docs/guide.md" in url for url in urls))
+
+ def test_clean_url_invalid_anchor_pattern(self):
+ """Test cleaning URLs with invalid anchor patterns."""
+ from skill_seekers.cli.llms_txt_parser import LlmsTxtParser
+
+ parser = LlmsTxtParser("", base_url="https://example.com")
+
+ # Invalid: path after anchor
+ result = parser._clean_url("https://example.com/page#section/index.html.md")
+ self.assertEqual(result, "https://example.com/page")
+
+ def test_clean_url_valid_anchor(self):
+ """Test that valid anchors are preserved."""
+ from skill_seekers.cli.llms_txt_parser import LlmsTxtParser
+
+ parser = LlmsTxtParser("", base_url="https://example.com")
+
+ # Valid anchor should be unchanged
+ result = parser._clean_url("https://example.com/page.md#section")
+ self.assertEqual(result, "https://example.com/page.md#section")
+
+ def test_clean_url_no_anchor(self):
+ """Test that URLs without anchors are unchanged."""
+ from skill_seekers.cli.llms_txt_parser import LlmsTxtParser
+
+ parser = LlmsTxtParser("", base_url="https://example.com")
+
+ result = parser._clean_url("https://example.com/docs/guide.md")
+ self.assertEqual(result, "https://example.com/docs/guide.md")
+
+ def test_deduplicate_urls(self):
+ """Test that duplicate URLs are removed."""
+ from skill_seekers.cli.llms_txt_parser import LlmsTxtParser
+
+ content = """
+- [Doc 1](https://example.com/doc.md)
+- [Doc 2](https://example.com/doc.md)
+https://example.com/doc.md
+"""
+ parser = LlmsTxtParser(content)
+ urls = parser.extract_urls()
+
+ # Should only have one instance
+ count = sum(1 for u in urls if u == "https://example.com/doc.md")
+ self.assertEqual(count, 1)
+
+
+class TestSavePageContentFiltering(unittest.TestCase):
+ """Test content filtering in save_page."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ from skill_seekers.cli.doc_scraper import DocToSkillConverter
+
+ self.config = {
+ 'name': 'test_save_filter',
+ 'base_url': 'https://example.com',
+ 'selectors': {},
+ 'url_patterns': {'include': [], 'exclude': []},
+ 'categories': {}
+ }
+ self.converter = DocToSkillConverter(self.config)
+
+ def tearDown(self):
+ """Clean up output directory."""
+ output_dir = f"output/{self.config['name']}_data"
+ if os.path.exists(output_dir):
+ shutil.rmtree(output_dir)
+
+ def test_skip_empty_content(self):
+ """Test that pages with empty content are skipped."""
+ page = {
+ 'url': 'https://example.com/empty',
+ 'title': 'Empty Page',
+ 'content': '',
+ 'headings': [],
+ 'code_samples': []
+ }
+
+ self.converter.save_page(page)
+
+ pages_dir = os.path.join(self.converter.data_dir, 'pages')
+ if os.path.exists(pages_dir):
+ self.assertEqual(len(os.listdir(pages_dir)), 0)
+
+ def test_skip_short_content_under_50_chars(self):
+ """Test that pages with content < 50 chars are skipped."""
+ page = {
+ 'url': 'https://example.com/short',
+ 'title': 'Short',
+ 'content': 'This is too short.', # 18 chars
+ 'headings': [],
+ 'code_samples': []
+ }
+
+ self.converter.save_page(page)
+
+ pages_dir = os.path.join(self.converter.data_dir, 'pages')
+ if os.path.exists(pages_dir):
+ self.assertEqual(len(os.listdir(pages_dir)), 0)
+
+ def test_save_content_over_50_chars(self):
+ """Test that pages with content >= 50 chars are saved."""
+ page = {
+ 'url': 'https://example.com/valid',
+ 'title': 'Valid Page',
+ 'content': 'A' * 60, # 60 chars, should pass
+ 'headings': [],
+ 'code_samples': []
+ }
+
+ self.converter.save_page(page)
+
+ pages_dir = os.path.join(self.converter.data_dir, 'pages')
+ self.assertTrue(os.path.exists(pages_dir))
+ self.assertEqual(len(os.listdir(pages_dir)), 1)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tests/test_multi_source.py b/tests/test_multi_source.py
new file mode 100644
index 0000000..80644d2
--- /dev/null
+++ b/tests/test_multi_source.py
@@ -0,0 +1,433 @@
+"""
+Tests for multi-source support in unified scraper and skill builder.
+
+Tests the following functionality:
+1. Multiple sources of same type in unified_scraper (list structure)
+2. Source counters and unique naming
+3. Per-source reference directory generation in unified_skill_builder
+4. Multiple documentation sources handling
+5. Multiple GitHub repositories handling
+"""
+
+import unittest
+import tempfile
+import os
+import shutil
+
+
+class TestUnifiedScraperDataStructure(unittest.TestCase):
+ """Test scraped_data list structure in unified_scraper."""
+
+ def test_scraped_data_uses_list_structure(self):
+ """Test that scraped_data uses list for each source type."""
+ from skill_seekers.cli.unified_scraper import UnifiedScraper
+
+ config = {
+ 'name': 'test_multi',
+ 'description': 'Test skill',
+ 'sources': [
+ {'type': 'documentation', 'base_url': 'https://example.com'}
+ ]
+ }
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ original_dir = os.getcwd()
+ try:
+ os.chdir(temp_dir)
+ scraper = UnifiedScraper(config)
+
+ self.assertIsInstance(scraper.scraped_data['documentation'], list)
+ self.assertIsInstance(scraper.scraped_data['github'], list)
+ self.assertIsInstance(scraper.scraped_data['pdf'], list)
+ finally:
+ os.chdir(original_dir)
+
+ def test_source_counters_initialized_to_zero(self):
+ """Test that source counters start at zero."""
+ from skill_seekers.cli.unified_scraper import UnifiedScraper
+
+ config = {
+ 'name': 'test_counters',
+ 'description': 'Test skill',
+ 'sources': [
+ {'type': 'documentation', 'base_url': 'https://example.com'}
+ ]
+ }
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ original_dir = os.getcwd()
+ try:
+ os.chdir(temp_dir)
+ scraper = UnifiedScraper(config)
+
+ self.assertEqual(scraper._source_counters['documentation'], 0)
+ self.assertEqual(scraper._source_counters['github'], 0)
+ self.assertEqual(scraper._source_counters['pdf'], 0)
+ finally:
+ os.chdir(original_dir)
+
+ def test_empty_lists_initially(self):
+ """Test that source lists are empty initially."""
+ from skill_seekers.cli.unified_scraper import UnifiedScraper
+
+ config = {
+ 'name': 'test_empty',
+ 'description': 'Test skill',
+ 'sources': [
+ {'type': 'documentation', 'base_url': 'https://example.com'}
+ ]
+ }
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ original_dir = os.getcwd()
+ try:
+ os.chdir(temp_dir)
+ scraper = UnifiedScraper(config)
+
+ self.assertEqual(len(scraper.scraped_data['documentation']), 0)
+ self.assertEqual(len(scraper.scraped_data['github']), 0)
+ self.assertEqual(len(scraper.scraped_data['pdf']), 0)
+ finally:
+ os.chdir(original_dir)
+
+
+class TestUnifiedSkillBuilderDocsReferences(unittest.TestCase):
+ """Test documentation reference generation for multiple sources."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ self.temp_dir = tempfile.mkdtemp()
+ self.original_dir = os.getcwd()
+ os.chdir(self.temp_dir)
+
+ def tearDown(self):
+ """Clean up test fixtures."""
+ os.chdir(self.original_dir)
+ if os.path.exists(self.temp_dir):
+ shutil.rmtree(self.temp_dir)
+
+ def test_creates_subdirectory_per_source(self):
+ """Test that each doc source gets its own subdirectory."""
+ from skill_seekers.cli.unified_skill_builder import UnifiedSkillBuilder
+
+ # Create mock refs directories
+ refs_dir1 = os.path.join(self.temp_dir, 'refs1')
+ refs_dir2 = os.path.join(self.temp_dir, 'refs2')
+ os.makedirs(refs_dir1)
+ os.makedirs(refs_dir2)
+
+ config = {
+ 'name': 'test_docs_refs',
+ 'description': 'Test',
+ 'sources': []
+ }
+
+ scraped_data = {
+ 'documentation': [
+ {'source_id': 'source_a', 'base_url': 'https://a.com', 'total_pages': 5, 'refs_dir': refs_dir1},
+ {'source_id': 'source_b', 'base_url': 'https://b.com', 'total_pages': 3, 'refs_dir': refs_dir2}
+ ],
+ 'github': [],
+ 'pdf': []
+ }
+
+ builder = UnifiedSkillBuilder(config, scraped_data)
+ builder._generate_docs_references(scraped_data['documentation'])
+
+ docs_dir = os.path.join(builder.skill_dir, 'references', 'documentation')
+ self.assertTrue(os.path.exists(os.path.join(docs_dir, 'source_a')))
+ self.assertTrue(os.path.exists(os.path.join(docs_dir, 'source_b')))
+
+ def test_creates_index_per_source(self):
+ """Test that each source subdirectory has its own index.md."""
+ from skill_seekers.cli.unified_skill_builder import UnifiedSkillBuilder
+
+ refs_dir = os.path.join(self.temp_dir, 'refs')
+ os.makedirs(refs_dir)
+
+ config = {
+ 'name': 'test_source_index',
+ 'description': 'Test',
+ 'sources': []
+ }
+
+ scraped_data = {
+ 'documentation': [
+ {'source_id': 'my_source', 'base_url': 'https://example.com', 'total_pages': 10, 'refs_dir': refs_dir}
+ ],
+ 'github': [],
+ 'pdf': []
+ }
+
+ builder = UnifiedSkillBuilder(config, scraped_data)
+ builder._generate_docs_references(scraped_data['documentation'])
+
+ source_index = os.path.join(builder.skill_dir, 'references', 'documentation', 'my_source', 'index.md')
+ self.assertTrue(os.path.exists(source_index))
+
+ with open(source_index, 'r') as f:
+ content = f.read()
+ self.assertIn('my_source', content)
+ self.assertIn('https://example.com', content)
+
+ def test_creates_main_index_listing_all_sources(self):
+ """Test that main index.md lists all documentation sources."""
+ from skill_seekers.cli.unified_skill_builder import UnifiedSkillBuilder
+
+ refs_dir1 = os.path.join(self.temp_dir, 'refs1')
+ refs_dir2 = os.path.join(self.temp_dir, 'refs2')
+ os.makedirs(refs_dir1)
+ os.makedirs(refs_dir2)
+
+ config = {
+ 'name': 'test_main_index',
+ 'description': 'Test',
+ 'sources': []
+ }
+
+ scraped_data = {
+ 'documentation': [
+ {'source_id': 'docs_one', 'base_url': 'https://one.com', 'total_pages': 10, 'refs_dir': refs_dir1},
+ {'source_id': 'docs_two', 'base_url': 'https://two.com', 'total_pages': 20, 'refs_dir': refs_dir2}
+ ],
+ 'github': [],
+ 'pdf': []
+ }
+
+ builder = UnifiedSkillBuilder(config, scraped_data)
+ builder._generate_docs_references(scraped_data['documentation'])
+
+ main_index = os.path.join(builder.skill_dir, 'references', 'documentation', 'index.md')
+ self.assertTrue(os.path.exists(main_index))
+
+ with open(main_index, 'r') as f:
+ content = f.read()
+ self.assertIn('docs_one', content)
+ self.assertIn('docs_two', content)
+ self.assertIn('2 documentation sources', content)
+
+ def test_copies_reference_files_to_source_dir(self):
+ """Test that reference files are copied to source subdirectory."""
+ from skill_seekers.cli.unified_skill_builder import UnifiedSkillBuilder
+
+ refs_dir = os.path.join(self.temp_dir, 'refs')
+ os.makedirs(refs_dir)
+
+ # Create mock reference files
+ with open(os.path.join(refs_dir, 'api.md'), 'w') as f:
+ f.write('# API Reference')
+ with open(os.path.join(refs_dir, 'guide.md'), 'w') as f:
+ f.write('# User Guide')
+
+ config = {
+ 'name': 'test_copy_refs',
+ 'description': 'Test',
+ 'sources': []
+ }
+
+ scraped_data = {
+ 'documentation': [
+ {'source_id': 'test_source', 'base_url': 'https://test.com', 'total_pages': 5, 'refs_dir': refs_dir}
+ ],
+ 'github': [],
+ 'pdf': []
+ }
+
+ builder = UnifiedSkillBuilder(config, scraped_data)
+ builder._generate_docs_references(scraped_data['documentation'])
+
+ source_dir = os.path.join(builder.skill_dir, 'references', 'documentation', 'test_source')
+ self.assertTrue(os.path.exists(os.path.join(source_dir, 'api.md')))
+ self.assertTrue(os.path.exists(os.path.join(source_dir, 'guide.md')))
+
+
+class TestUnifiedSkillBuilderGitHubReferences(unittest.TestCase):
+ """Test GitHub reference generation for multiple repositories."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ self.temp_dir = tempfile.mkdtemp()
+ self.original_dir = os.getcwd()
+ os.chdir(self.temp_dir)
+
+ def tearDown(self):
+ """Clean up test fixtures."""
+ os.chdir(self.original_dir)
+ if os.path.exists(self.temp_dir):
+ shutil.rmtree(self.temp_dir)
+
+ def test_creates_subdirectory_per_repo(self):
+ """Test that each GitHub repo gets its own subdirectory."""
+ from skill_seekers.cli.unified_skill_builder import UnifiedSkillBuilder
+
+ config = {
+ 'name': 'test_github_refs',
+ 'description': 'Test',
+ 'sources': []
+ }
+
+ scraped_data = {
+ 'documentation': [],
+ 'github': [
+ {'repo': 'org/repo1', 'repo_id': 'org_repo1', 'data': {'readme': '# Repo 1', 'issues': [], 'releases': [], 'repo_info': {}}},
+ {'repo': 'org/repo2', 'repo_id': 'org_repo2', 'data': {'readme': '# Repo 2', 'issues': [], 'releases': [], 'repo_info': {}}}
+ ],
+ 'pdf': []
+ }
+
+ builder = UnifiedSkillBuilder(config, scraped_data)
+ builder._generate_github_references(scraped_data['github'])
+
+ github_dir = os.path.join(builder.skill_dir, 'references', 'github')
+ self.assertTrue(os.path.exists(os.path.join(github_dir, 'org_repo1')))
+ self.assertTrue(os.path.exists(os.path.join(github_dir, 'org_repo2')))
+
+ def test_creates_readme_per_repo(self):
+ """Test that README.md is created for each repo."""
+ from skill_seekers.cli.unified_skill_builder import UnifiedSkillBuilder
+
+ config = {
+ 'name': 'test_readme',
+ 'description': 'Test',
+ 'sources': []
+ }
+
+ scraped_data = {
+ 'documentation': [],
+ 'github': [
+ {'repo': 'test/myrepo', 'repo_id': 'test_myrepo', 'data': {'readme': '# My Repository\n\nDescription here.', 'issues': [], 'releases': [], 'repo_info': {}}}
+ ],
+ 'pdf': []
+ }
+
+ builder = UnifiedSkillBuilder(config, scraped_data)
+ builder._generate_github_references(scraped_data['github'])
+
+ readme_path = os.path.join(builder.skill_dir, 'references', 'github', 'test_myrepo', 'README.md')
+ self.assertTrue(os.path.exists(readme_path))
+
+ with open(readme_path, 'r') as f:
+ content = f.read()
+ self.assertIn('test/myrepo', content)
+
+ def test_creates_issues_file_when_issues_exist(self):
+ """Test that issues.md is created when repo has issues."""
+ from skill_seekers.cli.unified_skill_builder import UnifiedSkillBuilder
+
+ config = {
+ 'name': 'test_issues',
+ 'description': 'Test',
+ 'sources': []
+ }
+
+ scraped_data = {
+ 'documentation': [],
+ 'github': [
+ {
+ 'repo': 'test/repo',
+ 'repo_id': 'test_repo',
+ 'data': {
+ 'readme': '# Repo',
+ 'issues': [
+ {'number': 1, 'title': 'Bug report', 'state': 'open', 'labels': ['bug'], 'url': 'https://github.com/test/repo/issues/1'},
+ {'number': 2, 'title': 'Feature request', 'state': 'closed', 'labels': ['enhancement'], 'url': 'https://github.com/test/repo/issues/2'}
+ ],
+ 'releases': [],
+ 'repo_info': {}
+ }
+ }
+ ],
+ 'pdf': []
+ }
+
+ builder = UnifiedSkillBuilder(config, scraped_data)
+ builder._generate_github_references(scraped_data['github'])
+
+ issues_path = os.path.join(builder.skill_dir, 'references', 'github', 'test_repo', 'issues.md')
+ self.assertTrue(os.path.exists(issues_path))
+
+ with open(issues_path, 'r') as f:
+ content = f.read()
+ self.assertIn('Bug report', content)
+ self.assertIn('Feature request', content)
+
+ def test_creates_main_index_listing_all_repos(self):
+ """Test that main index.md lists all GitHub repositories."""
+ from skill_seekers.cli.unified_skill_builder import UnifiedSkillBuilder
+
+ config = {
+ 'name': 'test_github_index',
+ 'description': 'Test',
+ 'sources': []
+ }
+
+ scraped_data = {
+ 'documentation': [],
+ 'github': [
+ {'repo': 'org/first', 'repo_id': 'org_first', 'data': {'readme': '#', 'issues': [], 'releases': [], 'repo_info': {'stars': 100}}},
+ {'repo': 'org/second', 'repo_id': 'org_second', 'data': {'readme': '#', 'issues': [], 'releases': [], 'repo_info': {'stars': 50}}}
+ ],
+ 'pdf': []
+ }
+
+ builder = UnifiedSkillBuilder(config, scraped_data)
+ builder._generate_github_references(scraped_data['github'])
+
+ main_index = os.path.join(builder.skill_dir, 'references', 'github', 'index.md')
+ self.assertTrue(os.path.exists(main_index))
+
+ with open(main_index, 'r') as f:
+ content = f.read()
+ self.assertIn('org/first', content)
+ self.assertIn('org/second', content)
+ self.assertIn('2 GitHub repositories', content)
+
+
+class TestUnifiedSkillBuilderPdfReferences(unittest.TestCase):
+ """Test PDF reference generation for multiple sources."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ self.temp_dir = tempfile.mkdtemp()
+ self.original_dir = os.getcwd()
+ os.chdir(self.temp_dir)
+
+ def tearDown(self):
+ """Clean up test fixtures."""
+ os.chdir(self.original_dir)
+ if os.path.exists(self.temp_dir):
+ shutil.rmtree(self.temp_dir)
+
+ def test_creates_pdf_index_with_count(self):
+ """Test that PDF index shows correct document count."""
+ from skill_seekers.cli.unified_skill_builder import UnifiedSkillBuilder
+
+ config = {
+ 'name': 'test_pdf',
+ 'description': 'Test',
+ 'sources': []
+ }
+
+ scraped_data = {
+ 'documentation': [],
+ 'github': [],
+ 'pdf': [
+ {'path': '/path/to/doc1.pdf'},
+ {'path': '/path/to/doc2.pdf'},
+ {'path': '/path/to/doc3.pdf'}
+ ]
+ }
+
+ builder = UnifiedSkillBuilder(config, scraped_data)
+ builder._generate_pdf_references(scraped_data['pdf'])
+
+ pdf_index = os.path.join(builder.skill_dir, 'references', 'pdf', 'index.md')
+ self.assertTrue(os.path.exists(pdf_index))
+
+ with open(pdf_index, 'r') as f:
+ content = f.read()
+ self.assertIn('3 PDF document', content)
+
+
+if __name__ == '__main__':
+ unittest.main()