Files
skill-seekers-reference/src/skill_seekers/mcp/git_repo.py
yusyus 81dd5bbfbc fix: Fix remaining 61 ruff linting errors (SIM102, SIM117)
Fixed all remaining linting errors from the 310 total:
- SIM102: Combined nested if statements (31 errors)
  - adaptors/openai.py
  - config_extractor.py
  - codebase_scraper.py
  - doc_scraper.py
  - github_fetcher.py
  - pattern_recognizer.py
  - pdf_scraper.py
  - test_example_extractor.py

- SIM117: Combined multiple with statements (24 errors)
  - tests/test_async_scraping.py (2 errors)
  - tests/test_github_scraper.py (2 errors)
  - tests/test_guide_enhancer.py (20 errors)

- Fixed test fixture parameter (mock_config in test_c3_integration.py)

All 700+ tests passing.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-17 23:25:12 +03:00

274 lines
8.6 KiB
Python

#!/usr/bin/env python3
"""
Git Config Repository Manager
Handles git clone/pull operations for custom config sources
"""
import json
import os
import shutil
from pathlib import Path
from urllib.parse import urlparse
import git
from git.exc import GitCommandError, InvalidGitRepositoryError
class GitConfigRepo:
"""Manages git operations for config repositories."""
def __init__(self, cache_dir: str | None = None):
"""
Initialize git repository manager.
Args:
cache_dir: Base cache directory. Defaults to $SKILL_SEEKERS_CACHE_DIR
or ~/.skill-seekers/cache/
"""
if cache_dir:
self.cache_dir = Path(cache_dir)
else:
# Use environment variable or default
env_cache = os.environ.get("SKILL_SEEKERS_CACHE_DIR")
if env_cache:
self.cache_dir = Path(env_cache).expanduser()
else:
self.cache_dir = Path.home() / ".skill-seekers" / "cache"
# Ensure cache directory exists
self.cache_dir.mkdir(parents=True, exist_ok=True)
def clone_or_pull(
self,
source_name: str,
git_url: str,
branch: str = "main",
token: str | None = None,
force_refresh: bool = False,
) -> Path:
"""
Clone repository if not cached, else pull latest changes.
Args:
source_name: Source identifier (used for cache path)
git_url: Git repository URL
branch: Branch to clone/pull (default: main)
token: Optional authentication token
force_refresh: If True, delete cache and re-clone
Returns:
Path to cloned repository
Raises:
GitCommandError: If clone/pull fails
ValueError: If git_url is invalid
"""
# Validate URL
if not self.validate_git_url(git_url):
raise ValueError(f"Invalid git URL: {git_url}")
# Determine cache path
repo_path = self.cache_dir / source_name
# Force refresh: delete existing cache
if force_refresh and repo_path.exists():
shutil.rmtree(repo_path)
# Inject token if provided
clone_url = git_url
if token:
clone_url = self.inject_token(git_url, token)
try:
if repo_path.exists() and (repo_path / ".git").exists():
# Repository exists - pull latest
try:
repo = git.Repo(repo_path)
origin = repo.remotes.origin
# Update remote URL if token provided
if token:
origin.set_url(clone_url)
# Pull latest changes
origin.pull(branch)
return repo_path
except (InvalidGitRepositoryError, GitCommandError):
# Corrupted repo - delete and re-clone
shutil.rmtree(repo_path)
raise # Re-raise to trigger clone below
# Repository doesn't exist - clone
git.Repo.clone_from(
clone_url,
repo_path,
branch=branch,
depth=1, # Shallow clone
single_branch=True, # Only clone one branch
)
return repo_path
except GitCommandError as e:
error_msg = str(e)
# Provide helpful error messages
if "authentication failed" in error_msg.lower() or "403" in error_msg:
raise GitCommandError(
f"Authentication failed for {git_url}. Check your token or permissions.", 128
) from e
elif "not found" in error_msg.lower() or "404" in error_msg:
raise GitCommandError(
f"Repository not found: {git_url}. Verify the URL is correct and you have access.",
128,
) from e
else:
raise GitCommandError(f"Failed to clone repository: {error_msg}", 128) from e
def find_configs(self, repo_path: Path) -> list[Path]:
"""
Find all config files (*.json) in repository.
Args:
repo_path: Path to cloned repo
Returns:
List of paths to *.json files (sorted by name)
"""
if not repo_path.exists():
return []
# Find all .json files, excluding .git directory
configs = []
for json_file in repo_path.rglob("*.json"):
# Skip files in .git directory
if ".git" in json_file.parts:
continue
configs.append(json_file)
# Sort by filename
return sorted(configs, key=lambda p: p.name)
def get_config(self, repo_path: Path, config_name: str) -> dict:
"""
Load specific config by name from repository.
Args:
repo_path: Path to cloned repo
config_name: Config name (without .json extension)
Returns:
Config dictionary
Raises:
FileNotFoundError: If config not found
ValueError: If config is invalid JSON
"""
# Ensure .json extension
if not config_name.endswith(".json"):
config_name = f"{config_name}.json"
# Search for config file
all_configs = self.find_configs(repo_path)
# Try exact filename match first
for config_path in all_configs:
if config_path.name == config_name:
return self._load_config_file(config_path)
# Try case-insensitive match
config_name_lower = config_name.lower()
for config_path in all_configs:
if config_path.name.lower() == config_name_lower:
return self._load_config_file(config_path)
# Config not found - provide helpful error
available = [p.stem for p in all_configs] # Just filenames without .json
raise FileNotFoundError(
f"Config '{config_name}' not found in repository. "
f"Available configs: {', '.join(available) if available else 'none'}"
)
def _load_config_file(self, config_path: Path) -> dict:
"""
Load and validate config JSON file.
Args:
config_path: Path to config file
Returns:
Config dictionary
Raises:
ValueError: If JSON is invalid
"""
try:
with open(config_path, encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in config file {config_path.name}: {e}") from e
@staticmethod
def inject_token(git_url: str, token: str) -> str:
"""
Inject authentication token into git URL.
Converts SSH URLs to HTTPS and adds token for authentication.
Args:
git_url: Original git URL
token: Authentication token
Returns:
URL with token injected
Examples:
https://github.com/org/repo.git → https://TOKEN@github.com/org/repo.git
git@github.com:org/repo.git → https://TOKEN@github.com/org/repo.git
"""
# Convert SSH to HTTPS
if git_url.startswith("git@"):
# git@github.com:org/repo.git → github.com/org/repo.git
parts = git_url.replace("git@", "").replace(":", "/", 1)
git_url = f"https://{parts}"
# Parse URL
parsed = urlparse(git_url)
# Inject token
if parsed.hostname:
# https://github.com/org/repo.git → https://TOKEN@github.com/org/repo.git
netloc = f"{token}@{parsed.hostname}"
if parsed.port:
netloc = f"{netloc}:{parsed.port}"
return f"{parsed.scheme}://{netloc}{parsed.path}"
return git_url
@staticmethod
def validate_git_url(git_url: str) -> bool:
"""
Validate git URL format.
Args:
git_url: Git repository URL
Returns:
True if valid, False otherwise
"""
if not git_url:
return False
# Accept HTTPS URLs
if git_url.startswith("https://") or git_url.startswith("http://"):
parsed = urlparse(git_url)
return bool(parsed.hostname and parsed.path)
# Accept SSH URLs
if git_url.startswith("git@"):
# git@github.com:org/repo.git
return ":" in git_url and len(git_url.split(":")) == 2
# Accept file:// URLs (for local testing)
return bool(git_url.startswith("file://"))