Files
skill-seekers-reference/src/skill_seekers/cli/utils.py
yusyus b636a0a292 fix: resolve issue #299 and Phase 1 cleanup
- Fix #299: rename --chunk-size/--chunk-overlap to --streaming-chunk-size/
  --streaming-overlap in arguments/package.py to avoid collision with the
  RAG --chunk-size flag from arguments/common.py
- Phase 1a: make package_skill.py import args via add_package_arguments()
  instead of a 105-line inline duplicate argparse block; fixes the root
  cause of _reconstruct_argv() passing unrecognised flag names
- Phase 1b: centralise setup_logging() into utils.py and remove 4
  duplicate module-level logging.basicConfig() calls from doc_scraper.py,
  github_scraper.py, codebase_scraper.py, and unified_scraper.py
- Fix test_package_structure.py / test_cli_paths.py version strings
  (3.1.1 → 3.1.2)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 21:22:05 +03:00

449 lines
14 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Utility functions for Skill Seeker CLI tools
"""
import logging
import os
import platform
import subprocess
import time
from collections.abc import Callable
from pathlib import Path
from typing import TypeVar
logger = logging.getLogger(__name__)
T = TypeVar("T")
def setup_logging(verbose: bool = False, quiet: bool = False) -> None:
"""Configure root logging level based on verbosity flags.
Args:
verbose: Enable DEBUG level logging
quiet: Enable WARNING level logging only (suppress INFO)
"""
if quiet:
level = logging.WARNING
elif verbose:
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(level=level, format="%(message)s", force=True)
def open_folder(folder_path: str | Path) -> bool:
"""
Open a folder in the system file browser
Args:
folder_path: Path to folder to open
Returns:
bool: True if successful, False otherwise
"""
folder_path = Path(folder_path).resolve()
if not folder_path.exists():
print(f"⚠️ Folder not found: {folder_path}")
return False
system = platform.system()
try:
if system == "Linux":
# Try xdg-open first (standard)
subprocess.run(["xdg-open", str(folder_path)], check=True)
elif system == "Darwin": # macOS
subprocess.run(["open", str(folder_path)], check=True)
elif system == "Windows":
subprocess.run(["explorer", str(folder_path)], check=True)
else:
print(f"⚠️ Unknown operating system: {system}")
return False
return True
except subprocess.CalledProcessError:
print("⚠️ Could not open folder automatically")
return False
except FileNotFoundError:
print("⚠️ File browser not found on system")
return False
def has_api_key() -> bool:
"""
Check if ANTHROPIC_API_KEY is set in environment
Returns:
bool: True if API key is set, False otherwise
"""
api_key = os.environ.get("ANTHROPIC_API_KEY", "").strip()
return len(api_key) > 0
def get_api_key() -> str | None:
"""
Get ANTHROPIC_API_KEY from environment
Returns:
str: API key or None if not set
"""
api_key = os.environ.get("ANTHROPIC_API_KEY", "").strip()
return api_key if api_key else None
def get_upload_url() -> str:
"""
Get the Claude skills upload URL
Returns:
str: Claude skills upload URL
"""
return "https://claude.ai/skills"
def print_upload_instructions(zip_path: str | Path) -> None:
"""
Print clear upload instructions for manual upload
Args:
zip_path: Path to the .zip file to upload
"""
zip_path = Path(zip_path)
print()
print("╔══════════════════════════════════════════════════════════╗")
print("║ NEXT STEP ║")
print("╚══════════════════════════════════════════════════════════╝")
print()
print(f"📤 Upload to Claude: {get_upload_url()}")
print()
print(f"1. Go to {get_upload_url()}")
print('2. Click "Upload Skill"')
print(f"3. Select: {zip_path}")
print("4. Done! ✅")
print()
def format_file_size(size_bytes: int) -> str:
"""
Format file size in human-readable format
Args:
size_bytes: Size in bytes
Returns:
str: Formatted size (e.g., "45.3 KB")
"""
if size_bytes < 1024:
return f"{size_bytes} bytes"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
else:
return f"{size_bytes / (1024 * 1024):.1f} MB"
def validate_skill_directory(skill_dir: str | Path) -> tuple[bool, str | None]:
"""
Validate that a directory is a valid skill directory
Args:
skill_dir: Path to skill directory
Returns:
tuple: (is_valid, error_message)
"""
skill_path = Path(skill_dir)
if not skill_path.exists():
return False, f"Directory not found: {skill_dir}"
if not skill_path.is_dir():
return False, f"Not a directory: {skill_dir}"
skill_md = skill_path / "SKILL.md"
if not skill_md.exists():
return False, f"SKILL.md not found in {skill_dir}"
return True, None
def validate_zip_file(zip_path: str | Path) -> tuple[bool, str | None]:
"""
Validate that a file is a valid skill .zip file
Args:
zip_path: Path to .zip file
Returns:
tuple: (is_valid, error_message)
"""
zip_path = Path(zip_path)
if not zip_path.exists():
return False, f"File not found: {zip_path}"
if not zip_path.is_file():
return False, f"Not a file: {zip_path}"
if zip_path.suffix != ".zip":
return False, f"Not a .zip file: {zip_path}"
return True, None
def read_reference_files(
skill_dir: str | Path, max_chars: int = 100000, preview_limit: int = 40000
) -> dict[str, dict]:
"""Read reference files from a skill directory with enriched metadata.
This function reads markdown files from the references/ subdirectory
of a skill, applying both per-file and total content limits.
Returns enriched metadata including source type, confidence, and path.
Args:
skill_dir (str or Path): Path to skill directory
max_chars (int): Maximum total characters to read (default: 100000)
preview_limit (int): Maximum characters per file (default: 40000)
Returns:
dict: Dictionary mapping filename to metadata dict with keys:
- 'content': File content
- 'source': Source type (documentation/github/pdf/api/codebase_analysis)
- 'confidence': Confidence level (high/medium/low)
- 'path': Relative path from references directory
- 'repo_id': Repository identifier for multi-source (e.g., 'encode_httpx'), None for single-source
Example:
>>> refs = read_reference_files('output/react/', max_chars=50000)
>>> refs['documentation/api.md']['source']
'documentation'
>>> refs['documentation/api.md']['confidence']
'high'
"""
from pathlib import Path
skill_path = Path(skill_dir)
references_dir = skill_path / "references"
references: dict[str, dict] = {}
if not references_dir.exists():
print(f"⚠ No references directory found at {references_dir}")
return references
def _determine_source_metadata(relative_path: Path) -> tuple[str, str, str | None]:
"""Determine source type, confidence level, and repo_id from path.
For multi-source support, extracts repo_id from paths like:
- codebase_analysis/encode_httpx/ARCHITECTURE.md -> repo_id='encode_httpx'
- github/README.md -> repo_id=None (single source)
Returns:
tuple: (source_type, confidence_level, repo_id)
"""
path_str = str(relative_path)
repo_id = None # Default: no repo identity
# Documentation sources (official docs)
if path_str.startswith("documentation/"):
return "documentation", "high", None
# GitHub sources
elif path_str.startswith("github/"):
# README and releases are medium confidence
if "README" in path_str or "releases" in path_str:
return "github", "medium", None
# Issues are low confidence (user reports)
elif "issues" in path_str:
return "github", "low", None
else:
return "github", "medium", None
# PDF sources (books, manuals)
elif path_str.startswith("pdf/"):
return "pdf", "high", None
# Merged API (synthesized from multiple sources)
elif path_str.startswith("api/"):
return "api", "high", None
# Codebase analysis (C3.x automated analysis)
elif path_str.startswith("codebase_analysis/"):
# Extract repo_id from path: codebase_analysis/{repo_id}/...
parts = Path(path_str).parts
if len(parts) >= 2:
repo_id = parts[1] # e.g., 'encode_httpx', 'encode_httpcore'
# ARCHITECTURE.md is high confidence (comprehensive)
if "ARCHITECTURE" in path_str:
return "codebase_analysis", "high", repo_id
# Patterns and examples are medium (heuristic-based)
elif "patterns" in path_str or "examples" in path_str:
return "codebase_analysis", "medium", repo_id
# Configuration is high (direct extraction)
elif "configuration" in path_str:
return "codebase_analysis", "high", repo_id
else:
return "codebase_analysis", "medium", repo_id
# Conflicts report (discrepancy detection)
elif "conflicts" in path_str:
return "conflicts", "medium", None
# Fallback
else:
return "unknown", "medium", None
total_chars = 0
# Search recursively for all .md files (including subdirectories like github/README.md)
for ref_file in sorted(references_dir.rglob("*.md")):
# Note: We now include index.md files as they contain important content
# (patterns, examples, configuration analysis)
content = ref_file.read_text(encoding="utf-8")
# Limit size per file
truncated = False
if len(content) > preview_limit:
content = content[:preview_limit] + "\n\n[Content truncated...]"
truncated = True
# Use relative path from references_dir as key for nested files
relative_path = ref_file.relative_to(references_dir)
source_type, confidence, repo_id = _determine_source_metadata(relative_path)
# Build enriched metadata (with repo_id for multi-source support)
references[str(relative_path)] = {
"content": content,
"source": source_type,
"confidence": confidence,
"path": str(relative_path),
"truncated": truncated,
"size": len(content),
"repo_id": repo_id, # None for single-source, repo identifier for multi-source
}
total_chars += len(content)
# Stop if we've read enough
if total_chars > max_chars:
print(f" Limiting input to {max_chars:,} characters")
break
return references
def retry_with_backoff(
operation: Callable[[], T],
max_attempts: int = 3,
base_delay: float = 1.0,
operation_name: str = "operation",
) -> T:
"""Retry an operation with exponential backoff.
Useful for network operations that may fail due to transient errors.
Waits progressively longer between retries (exponential backoff).
Args:
operation: Function to retry (takes no arguments, returns result)
max_attempts: Maximum number of attempts (default: 3)
base_delay: Base delay in seconds, doubles each retry (default: 1.0)
operation_name: Name for logging purposes (default: "operation")
Returns:
Result of successful operation
Raises:
Exception: Last exception if all retries fail
Example:
>>> def fetch_page():
... response = requests.get(url, timeout=30)
... response.raise_for_status()
... return response.text
>>> content = retry_with_backoff(fetch_page, max_attempts=3, operation_name=f"fetch {url}")
"""
last_exception: Exception | None = None
for attempt in range(1, max_attempts + 1):
try:
return operation()
except Exception as e:
last_exception = e
if attempt < max_attempts:
delay = base_delay * (2 ** (attempt - 1))
logger.warning(
"%s failed (attempt %d/%d), retrying in %.1fs: %s",
operation_name,
attempt,
max_attempts,
delay,
e,
)
time.sleep(delay)
else:
logger.error("%s failed after %d attempts: %s", operation_name, max_attempts, e)
# This should always have a value, but mypy doesn't know that
if last_exception is not None:
raise last_exception
raise RuntimeError(f"{operation_name} failed with no exception captured")
async def retry_with_backoff_async(
operation: Callable[[], T],
max_attempts: int = 3,
base_delay: float = 1.0,
operation_name: str = "operation",
) -> T:
"""Async version of retry_with_backoff for async operations.
Args:
operation: Async function to retry (takes no arguments, returns awaitable)
max_attempts: Maximum number of attempts (default: 3)
base_delay: Base delay in seconds, doubles each retry (default: 1.0)
operation_name: Name for logging purposes (default: "operation")
Returns:
Result of successful operation
Raises:
Exception: Last exception if all retries fail
Example:
>>> async def fetch_page():
... response = await client.get(url, timeout=30.0)
... response.raise_for_status()
... return response.text
>>> content = await retry_with_backoff_async(fetch_page, operation_name=f"fetch {url}")
"""
import asyncio
last_exception: Exception | None = None
for attempt in range(1, max_attempts + 1):
try:
return await operation()
except Exception as e:
last_exception = e
if attempt < max_attempts:
delay = base_delay * (2 ** (attempt - 1))
logger.warning(
"%s failed (attempt %d/%d), retrying in %.1fs: %s",
operation_name,
attempt,
max_attempts,
delay,
e,
)
await asyncio.sleep(delay)
else:
logger.error("%s failed after %d attempts: %s", operation_name, max_attempts, e)
if last_exception is not None:
raise last_exception
raise RuntimeError(f"{operation_name} failed with no exception captured")