Files
skill-seekers-reference/src/skill_seekers/mcp/source_manager.py
yusyus c910703913 feat(A1.9): Add multi-source git repository support for config fetching
This major feature enables fetching configs from private/team git repositories
in addition to the public API, unlocking team collaboration and custom config
collections.

**New Components:**
- git_repo.py (283 lines): GitConfigRepo class for git operations
  - Shallow clone/pull with GitPython
  - Config discovery (recursive *.json search)
  - Token injection for private repos
  - Comprehensive error handling

- source_manager.py (260 lines): SourceManager class for registry
  - Add/list/remove config sources
  - Priority-based resolution
  - Atomic file I/O
  - Auto-detect token env vars

**MCP Integration:**
- Enhanced fetch_config: 3 modes (API, Git URL, Named Source)
- New tools: add_config_source, list_config_sources, remove_config_source
- Backward compatible: existing API mode unchanged

**Testing:**
- 83 tests (100% passing)
  - 35 tests for GitConfigRepo
  - 48 tests for SourceManager
  - Integration tests for MCP tools
- Comprehensive error scenarios covered

**Dependencies:**
- Added GitPython>=3.1.40

**Architecture:**
- Storage: ~/.skill-seekers/sources.json (registry)
- Cache: $SKILL_SEEKERS_CACHE_DIR (default: ~/.skill-seekers/cache/)
- Auth: Environment variables only (GITHUB_TOKEN, GITLAB_TOKEN, etc.)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-21 19:28:22 +03:00

294 lines
8.4 KiB
Python

#!/usr/bin/env python3
"""
Config Source Manager
Manages registry of custom config sources (git repositories)
"""
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
class SourceManager:
"""Manages config source registry at ~/.skill-seekers/sources.json"""
def __init__(self, config_dir: Optional[str] = None):
"""
Initialize source manager.
Args:
config_dir: Base config directory. Defaults to ~/.skill-seekers/
"""
if config_dir:
self.config_dir = Path(config_dir)
else:
self.config_dir = Path.home() / ".skill-seekers"
# Ensure config directory exists
self.config_dir.mkdir(parents=True, exist_ok=True)
# Registry file path
self.registry_file = self.config_dir / "sources.json"
# Initialize registry if it doesn't exist
if not self.registry_file.exists():
self._write_registry({"version": "1.0", "sources": []})
def add_source(
self,
name: str,
git_url: str,
source_type: str = "github",
token_env: Optional[str] = None,
branch: str = "main",
priority: int = 100,
enabled: bool = True
) -> dict:
"""
Add or update a config source.
Args:
name: Source identifier (lowercase, alphanumeric + hyphens/underscores)
git_url: Git repository URL
source_type: Source type (github, gitlab, bitbucket, custom)
token_env: Environment variable name for auth token
branch: Git branch to use (default: main)
priority: Source priority (lower = higher priority, default: 100)
enabled: Whether source is enabled (default: True)
Returns:
Source dictionary
Raises:
ValueError: If name is invalid or git_url is empty
"""
# Validate name
if not name or not name.replace("-", "").replace("_", "").isalnum():
raise ValueError(
f"Invalid source name '{name}'. "
"Must be alphanumeric with optional hyphens/underscores."
)
# Validate git_url
if not git_url or not git_url.strip():
raise ValueError("git_url cannot be empty")
# Auto-detect token_env if not provided
if token_env is None:
token_env = self._default_token_env(source_type)
# Create source entry
source = {
"name": name.lower(),
"git_url": git_url.strip(),
"type": source_type.lower(),
"token_env": token_env,
"branch": branch,
"enabled": enabled,
"priority": priority,
"added_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat()
}
# Load registry
registry = self._read_registry()
# Check if source exists
existing_index = None
for i, existing_source in enumerate(registry["sources"]):
if existing_source["name"] == source["name"]:
existing_index = i
# Preserve added_at timestamp
source["added_at"] = existing_source.get("added_at", source["added_at"])
break
# Add or update
if existing_index is not None:
registry["sources"][existing_index] = source
else:
registry["sources"].append(source)
# Sort by priority (lower first)
registry["sources"].sort(key=lambda s: s["priority"])
# Save registry
self._write_registry(registry)
return source
def get_source(self, name: str) -> dict:
"""
Get source by name.
Args:
name: Source identifier
Returns:
Source dictionary
Raises:
KeyError: If source not found
"""
registry = self._read_registry()
# Search for source (case-insensitive)
name_lower = name.lower()
for source in registry["sources"]:
if source["name"] == name_lower:
return source
# Not found - provide helpful error
available = [s["name"] for s in registry["sources"]]
raise KeyError(
f"Source '{name}' not found. "
f"Available sources: {', '.join(available) if available else 'none'}"
)
def list_sources(self, enabled_only: bool = False) -> list[dict]:
"""
List all config sources.
Args:
enabled_only: If True, only return enabled sources
Returns:
List of source dictionaries (sorted by priority)
"""
registry = self._read_registry()
if enabled_only:
return [s for s in registry["sources"] if s.get("enabled", True)]
return registry["sources"]
def remove_source(self, name: str) -> bool:
"""
Remove source by name.
Args:
name: Source identifier
Returns:
True if removed, False if not found
"""
registry = self._read_registry()
# Find source index
name_lower = name.lower()
for i, source in enumerate(registry["sources"]):
if source["name"] == name_lower:
# Remove source
del registry["sources"][i]
# Save registry
self._write_registry(registry)
return True
return False
def update_source(
self,
name: str,
**kwargs
) -> dict:
"""
Update specific fields of an existing source.
Args:
name: Source identifier
**kwargs: Fields to update (git_url, branch, enabled, priority, etc.)
Returns:
Updated source dictionary
Raises:
KeyError: If source not found
"""
# Get existing source
source = self.get_source(name)
# Update allowed fields
allowed_fields = {"git_url", "type", "token_env", "branch", "enabled", "priority"}
for field, value in kwargs.items():
if field in allowed_fields:
source[field] = value
# Update timestamp
source["updated_at"] = datetime.now(timezone.utc).isoformat()
# Save changes
registry = self._read_registry()
for i, s in enumerate(registry["sources"]):
if s["name"] == source["name"]:
registry["sources"][i] = source
break
# Re-sort by priority
registry["sources"].sort(key=lambda s: s["priority"])
self._write_registry(registry)
return source
def _read_registry(self) -> dict:
"""
Read registry from file.
Returns:
Registry dictionary
"""
try:
with open(self.registry_file, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Corrupted registry file: {e}") from e
def _write_registry(self, registry: dict) -> None:
"""
Write registry to file atomically.
Args:
registry: Registry dictionary
"""
# Validate schema
if "version" not in registry or "sources" not in registry:
raise ValueError("Invalid registry schema")
# Atomic write: write to temp file, then rename
temp_file = self.registry_file.with_suffix(".tmp")
try:
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(registry, f, indent=2, ensure_ascii=False)
# Atomic rename
temp_file.replace(self.registry_file)
except Exception as e:
# Clean up temp file on error
if temp_file.exists():
temp_file.unlink()
raise e
@staticmethod
def _default_token_env(source_type: str) -> str:
"""
Get default token environment variable name for source type.
Args:
source_type: Source type (github, gitlab, bitbucket, custom)
Returns:
Environment variable name (e.g., GITHUB_TOKEN)
"""
type_map = {
"github": "GITHUB_TOKEN",
"gitlab": "GITLAB_TOKEN",
"gitea": "GITEA_TOKEN",
"bitbucket": "BITBUCKET_TOKEN",
"custom": "GIT_TOKEN"
}
return type_map.get(source_type.lower(), "GIT_TOKEN")