diff --git a/pyproject.toml b/pyproject.toml index 91c8391..e94e498 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dependencies = [ "requests>=2.32.5", "beautifulsoup4>=4.14.2", "PyGithub>=2.5.0", + "GitPython>=3.1.40", "mcp>=1.18.0", "httpx>=0.28.1", "httpx-sse>=0.4.3", diff --git a/src/skill_seekers/mcp/git_repo.py b/src/skill_seekers/mcp/git_repo.py new file mode 100644 index 0000000..bcdf9f9 --- /dev/null +++ b/src/skill_seekers/mcp/git_repo.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 +""" +Git Config Repository Manager +Handles git clone/pull operations for custom config sources +""" + +import json +import os +import shutil +from pathlib import Path +from typing import Optional +from urllib.parse import urlparse +import git +from git.exc import GitCommandError, InvalidGitRepositoryError + + +class GitConfigRepo: + """Manages git operations for config repositories.""" + + def __init__(self, cache_dir: Optional[str] = None): + """ + Initialize git repository manager. + + Args: + cache_dir: Base cache directory. Defaults to $SKILL_SEEKERS_CACHE_DIR + or ~/.skill-seekers/cache/ + """ + if cache_dir: + self.cache_dir = Path(cache_dir) + else: + # Use environment variable or default + env_cache = os.environ.get("SKILL_SEEKERS_CACHE_DIR") + if env_cache: + self.cache_dir = Path(env_cache).expanduser() + else: + self.cache_dir = Path.home() / ".skill-seekers" / "cache" + + # Ensure cache directory exists + self.cache_dir.mkdir(parents=True, exist_ok=True) + + def clone_or_pull( + self, + source_name: str, + git_url: str, + branch: str = "main", + token: Optional[str] = None, + force_refresh: bool = False + ) -> Path: + """ + Clone repository if not cached, else pull latest changes. + + Args: + source_name: Source identifier (used for cache path) + git_url: Git repository URL + branch: Branch to clone/pull (default: main) + token: Optional authentication token + force_refresh: If True, delete cache and re-clone + + Returns: + Path to cloned repository + + Raises: + GitCommandError: If clone/pull fails + ValueError: If git_url is invalid + """ + # Validate URL + if not self.validate_git_url(git_url): + raise ValueError(f"Invalid git URL: {git_url}") + + # Determine cache path + repo_path = self.cache_dir / source_name + + # Force refresh: delete existing cache + if force_refresh and repo_path.exists(): + shutil.rmtree(repo_path) + + # Inject token if provided + clone_url = git_url + if token: + clone_url = self.inject_token(git_url, token) + + try: + if repo_path.exists() and (repo_path / ".git").exists(): + # Repository exists - pull latest + try: + repo = git.Repo(repo_path) + origin = repo.remotes.origin + + # Update remote URL if token provided + if token: + origin.set_url(clone_url) + + # Pull latest changes + origin.pull(branch) + return repo_path + except (InvalidGitRepositoryError, GitCommandError) as e: + # Corrupted repo - delete and re-clone + shutil.rmtree(repo_path) + raise # Re-raise to trigger clone below + + # Repository doesn't exist - clone + git.Repo.clone_from( + clone_url, + repo_path, + branch=branch, + depth=1, # Shallow clone + single_branch=True # Only clone one branch + ) + return repo_path + + except GitCommandError as e: + error_msg = str(e) + + # Provide helpful error messages + if "authentication failed" in error_msg.lower() or "403" in error_msg: + raise GitCommandError( + f"Authentication failed for {git_url}. " + f"Check your token or permissions.", + 128 + ) from e + elif "not found" in error_msg.lower() or "404" in error_msg: + raise GitCommandError( + f"Repository not found: {git_url}. " + f"Verify the URL is correct and you have access.", + 128 + ) from e + else: + raise GitCommandError( + f"Failed to clone repository: {error_msg}", + 128 + ) from e + + def find_configs(self, repo_path: Path) -> list[Path]: + """ + Find all config files (*.json) in repository. + + Args: + repo_path: Path to cloned repo + + Returns: + List of paths to *.json files (sorted by name) + """ + if not repo_path.exists(): + return [] + + # Find all .json files, excluding .git directory + configs = [] + for json_file in repo_path.rglob("*.json"): + # Skip files in .git directory + if ".git" in json_file.parts: + continue + configs.append(json_file) + + # Sort by filename + return sorted(configs, key=lambda p: p.name) + + def get_config(self, repo_path: Path, config_name: str) -> dict: + """ + Load specific config by name from repository. + + Args: + repo_path: Path to cloned repo + config_name: Config name (without .json extension) + + Returns: + Config dictionary + + Raises: + FileNotFoundError: If config not found + ValueError: If config is invalid JSON + """ + # Ensure .json extension + if not config_name.endswith(".json"): + config_name = f"{config_name}.json" + + # Search for config file + all_configs = self.find_configs(repo_path) + + # Try exact filename match first + for config_path in all_configs: + if config_path.name == config_name: + return self._load_config_file(config_path) + + # Try case-insensitive match + config_name_lower = config_name.lower() + for config_path in all_configs: + if config_path.name.lower() == config_name_lower: + return self._load_config_file(config_path) + + # Config not found - provide helpful error + available = [p.stem for p in all_configs] # Just filenames without .json + raise FileNotFoundError( + f"Config '{config_name}' not found in repository. " + f"Available configs: {', '.join(available) if available else 'none'}" + ) + + def _load_config_file(self, config_path: Path) -> dict: + """ + Load and validate config JSON file. + + Args: + config_path: Path to config file + + Returns: + Config dictionary + + Raises: + ValueError: If JSON is invalid + """ + try: + with open(config_path, 'r', encoding='utf-8') as f: + return json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in config file {config_path.name}: {e}") from e + + @staticmethod + def inject_token(git_url: str, token: str) -> str: + """ + Inject authentication token into git URL. + + Converts SSH URLs to HTTPS and adds token for authentication. + + Args: + git_url: Original git URL + token: Authentication token + + Returns: + URL with token injected + + Examples: + https://github.com/org/repo.git → https://TOKEN@github.com/org/repo.git + git@github.com:org/repo.git → https://TOKEN@github.com/org/repo.git + """ + # Convert SSH to HTTPS + if git_url.startswith("git@"): + # git@github.com:org/repo.git → github.com/org/repo.git + parts = git_url.replace("git@", "").replace(":", "/", 1) + git_url = f"https://{parts}" + + # Parse URL + parsed = urlparse(git_url) + + # Inject token + if parsed.hostname: + # https://github.com/org/repo.git → https://TOKEN@github.com/org/repo.git + netloc = f"{token}@{parsed.hostname}" + if parsed.port: + netloc = f"{netloc}:{parsed.port}" + + return f"{parsed.scheme}://{netloc}{parsed.path}" + + return git_url + + @staticmethod + def validate_git_url(git_url: str) -> bool: + """ + Validate git URL format. + + Args: + git_url: Git repository URL + + Returns: + True if valid, False otherwise + """ + if not git_url: + return False + + # Accept HTTPS URLs + if git_url.startswith("https://") or git_url.startswith("http://"): + parsed = urlparse(git_url) + return bool(parsed.hostname and parsed.path) + + # Accept SSH URLs + if git_url.startswith("git@"): + # git@github.com:org/repo.git + return ":" in git_url and len(git_url.split(":")) == 2 + + # Accept file:// URLs (for local testing) + if git_url.startswith("file://"): + return True + + return False diff --git a/src/skill_seekers/mcp/server.py b/src/skill_seekers/mcp/server.py index 73a9c5b..e1f619d 100644 --- a/src/skill_seekers/mcp/server.py +++ b/src/skill_seekers/mcp/server.py @@ -420,13 +420,13 @@ async def list_tools() -> list[Tool]: ), Tool( name="fetch_config", - description="Download a config file from api.skillseekersweb.com. List available configs or download a specific one by name.", + description="Fetch config from API, git URL, or registered source. Supports three modes: (1) Named source from registry, (2) Direct git URL, (3) API (default). List available configs or download a specific one by name.", inputSchema={ "type": "object", "properties": { "config_name": { "type": "string", - "description": "Name of the config to download (e.g., 'react', 'django', 'godot'). Omit to list all available configs.", + "description": "Name of the config to download (e.g., 'react', 'django', 'godot'). Required for git modes. Omit to list all available configs in API mode.", }, "destination": { "type": "string", @@ -435,12 +435,34 @@ async def list_tools() -> list[Tool]: }, "list_available": { "type": "boolean", - "description": "List all available configs from the API (default: false)", + "description": "List all available configs from the API (only works in API mode, default: false)", "default": False, }, "category": { "type": "string", - "description": "Filter configs by category when listing (e.g., 'web-frameworks', 'game-engines', 'devops')", + "description": "Filter configs by category when listing in API mode (e.g., 'web-frameworks', 'game-engines', 'devops')", + }, + "git_url": { + "type": "string", + "description": "Git repository URL containing configs. If provided, fetches from git instead of API. Supports HTTPS and SSH URLs. Example: 'https://github.com/myorg/configs.git'", + }, + "source": { + "type": "string", + "description": "Named source from registry (highest priority). Use add_config_source to register sources first. Example: 'team', 'company'", + }, + "branch": { + "type": "string", + "description": "Git branch to use (default: 'main'). Only used with git_url or source.", + "default": "main", + }, + "token": { + "type": "string", + "description": "Authentication token for private repos (optional). Prefer using environment variables (GITHUB_TOKEN, GITLAB_TOKEN, etc.).", + }, + "refresh": { + "type": "boolean", + "description": "Force refresh cached git repository (default: false). Deletes cache and re-clones. Only used with git modes.", + "default": False, }, }, "required": [], @@ -472,6 +494,77 @@ async def list_tools() -> list[Tool]: "required": [], }, ), + Tool( + name="add_config_source", + description="Register a git repository as a config source. Allows fetching configs from private/team repos. Use this to set up named sources that can be referenced by fetch_config. Supports GitHub, GitLab, Gitea, Bitbucket, and custom git servers.", + inputSchema={ + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Source identifier (lowercase, alphanumeric, hyphens/underscores allowed). Example: 'team', 'company-internal', 'my_configs'", + }, + "git_url": { + "type": "string", + "description": "Git repository URL (HTTPS or SSH). Example: 'https://github.com/myorg/configs.git' or 'git@github.com:myorg/configs.git'", + }, + "source_type": { + "type": "string", + "description": "Source type (default: 'github'). Options: 'github', 'gitlab', 'gitea', 'bitbucket', 'custom'", + "default": "github", + }, + "token_env": { + "type": "string", + "description": "Environment variable name for auth token (optional). Auto-detected if not provided. Example: 'GITHUB_TOKEN', 'GITLAB_TOKEN', 'MY_CUSTOM_TOKEN'", + }, + "branch": { + "type": "string", + "description": "Git branch to use (default: 'main'). Example: 'main', 'master', 'develop'", + "default": "main", + }, + "priority": { + "type": "integer", + "description": "Source priority (lower = higher priority, default: 100). Used for conflict resolution when same config exists in multiple sources.", + "default": 100, + }, + "enabled": { + "type": "boolean", + "description": "Whether source is enabled (default: true)", + "default": True, + }, + }, + "required": ["name", "git_url"], + }, + ), + Tool( + name="list_config_sources", + description="List all registered config sources. Shows git repositories that have been registered with add_config_source. Use this to see available sources for fetch_config.", + inputSchema={ + "type": "object", + "properties": { + "enabled_only": { + "type": "boolean", + "description": "Only show enabled sources (default: false)", + "default": False, + }, + }, + "required": [], + }, + ), + Tool( + name="remove_config_source", + description="Remove a registered config source. Deletes the source from the registry. Does not delete cached git repository data.", + inputSchema={ + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Source identifier to remove. Example: 'team', 'company-internal'", + }, + }, + "required": ["name"], + }, + ), ] @@ -506,6 +599,12 @@ async def call_tool(name: str, arguments: Any) -> list[TextContent]: return await fetch_config_tool(arguments) elif name == "submit_config": return await submit_config_tool(arguments) + elif name == "add_config_source": + return await add_config_source_tool(arguments) + elif name == "list_config_sources": + return await list_config_sources_tool(arguments) + elif name == "remove_config_source": + return await remove_config_source_tool(arguments) else: return [TextContent(type="text", text=f"Unknown tool: {name}")] @@ -1112,81 +1211,63 @@ async def scrape_github_tool(args: dict) -> list[TextContent]: async def fetch_config_tool(args: dict) -> list[TextContent]: - """Download config file from API""" - API_BASE_URL = "https://api.skillseekersweb.com" + """Fetch config from API, git URL, or named source""" + from skill_seekers.mcp.git_repo import GitConfigRepo + from skill_seekers.mcp.source_manager import SourceManager config_name = args.get("config_name") destination = args.get("destination", "configs") list_available = args.get("list_available", False) category = args.get("category") + # Git mode parameters + source_name = args.get("source") + git_url = args.get("git_url") + branch = args.get("branch", "main") + token = args.get("token") + force_refresh = args.get("refresh", False) + try: - async with httpx.AsyncClient(timeout=30.0) as client: - # List available configs if requested or no config_name provided - if list_available or not config_name: - # Build API URL with optional category filter - list_url = f"{API_BASE_URL}/api/configs" - params = {} - if category: - params["category"] = category - - response = await client.get(list_url, params=params) - response.raise_for_status() - data = response.json() - - configs = data.get("configs", []) - total = data.get("total", 0) - filters = data.get("filters") - - # Format list output - result = f"šŸ“‹ Available Configs ({total} total)\n" - if filters: - result += f"šŸ” Filters: {filters}\n" - result += "\n" - - # Group by category - by_category = {} - for config in configs: - cat = config.get("category", "uncategorized") - if cat not in by_category: - by_category[cat] = [] - by_category[cat].append(config) - - for cat, cat_configs in sorted(by_category.items()): - result += f"\n**{cat.upper()}** ({len(cat_configs)} configs):\n" - for cfg in cat_configs: - name = cfg.get("name") - desc = cfg.get("description", "")[:60] - config_type = cfg.get("type", "unknown") - tags = ", ".join(cfg.get("tags", [])[:3]) - result += f" • {name} [{config_type}] - {desc}{'...' if len(cfg.get('description', '')) > 60 else ''}\n" - if tags: - result += f" Tags: {tags}\n" - - result += f"\nšŸ’” To download a config, use: fetch_config with config_name=''\n" - result += f"šŸ“š API Docs: {API_BASE_URL}/docs\n" - - return [TextContent(type="text", text=result)] - - # Download specific config + # MODE 1: Named Source (highest priority) + if source_name: if not config_name: - return [TextContent(type="text", text="āŒ Error: Please provide config_name or set list_available=true")] + return [TextContent(type="text", text="āŒ Error: config_name is required when using source parameter")] - # Get config details first - detail_url = f"{API_BASE_URL}/api/configs/{config_name}" - detail_response = await client.get(detail_url) + # Get source from registry + source_manager = SourceManager() + try: + source = source_manager.get_source(source_name) + except KeyError as e: + return [TextContent(type="text", text=f"āŒ {str(e)}")] - if detail_response.status_code == 404: - return [TextContent(type="text", text=f"āŒ Config '{config_name}' not found. Use list_available=true to see available configs.")] + git_url = source["git_url"] + branch = source.get("branch", branch) + token_env = source.get("token_env") - detail_response.raise_for_status() - config_info = detail_response.json() + # Get token from environment if not provided + if not token and token_env: + token = os.environ.get(token_env) - # Download the actual config file - download_url = f"{API_BASE_URL}/api/download/{config_name}.json" - download_response = await client.get(download_url) - download_response.raise_for_status() - config_data = download_response.json() + # Clone/pull repository + git_repo = GitConfigRepo() + try: + repo_path = git_repo.clone_or_pull( + source_name=source_name, + git_url=git_url, + branch=branch, + token=token, + force_refresh=force_refresh + ) + except Exception as e: + return [TextContent(type="text", text=f"āŒ Git error: {str(e)}")] + + # Load config from repository + try: + config_data = git_repo.get_config(repo_path, config_name) + except FileNotFoundError as e: + return [TextContent(type="text", text=f"āŒ {str(e)}")] + except ValueError as e: + return [TextContent(type="text", text=f"āŒ {str(e)}")] # Save to destination dest_path = Path(destination) @@ -1196,8 +1277,160 @@ async def fetch_config_tool(args: dict) -> list[TextContent]: with open(config_file, 'w') as f: json.dump(config_data, f, indent=2) - # Build result message - result = f"""āœ… Config downloaded successfully! + result = f"""āœ… Config fetched from git source successfully! + +šŸ“¦ Config: {config_name} +šŸ“‚ Saved to: {config_file} +šŸ”— Source: {source_name} +🌿 Branch: {branch} +šŸ“ Repository: {git_url} +šŸ”„ Refreshed: {'Yes (forced)' if force_refresh else 'No (used cache)'} + +Next steps: + 1. Review config: cat {config_file} + 2. Estimate pages: Use estimate_pages tool + 3. Scrape docs: Use scrape_docs tool + +šŸ’” Manage sources: Use add_config_source, list_config_sources, remove_config_source tools +""" + return [TextContent(type="text", text=result)] + + # MODE 2: Direct Git URL + elif git_url: + if not config_name: + return [TextContent(type="text", text="āŒ Error: config_name is required when using git_url parameter")] + + # Clone/pull repository + git_repo = GitConfigRepo() + source_name_temp = f"temp_{config_name}" + + try: + repo_path = git_repo.clone_or_pull( + source_name=source_name_temp, + git_url=git_url, + branch=branch, + token=token, + force_refresh=force_refresh + ) + except ValueError as e: + return [TextContent(type="text", text=f"āŒ Invalid git URL: {str(e)}")] + except Exception as e: + return [TextContent(type="text", text=f"āŒ Git error: {str(e)}")] + + # Load config from repository + try: + config_data = git_repo.get_config(repo_path, config_name) + except FileNotFoundError as e: + return [TextContent(type="text", text=f"āŒ {str(e)}")] + except ValueError as e: + return [TextContent(type="text", text=f"āŒ {str(e)}")] + + # Save to destination + dest_path = Path(destination) + dest_path.mkdir(parents=True, exist_ok=True) + config_file = dest_path / f"{config_name}.json" + + with open(config_file, 'w') as f: + json.dump(config_data, f, indent=2) + + result = f"""āœ… Config fetched from git URL successfully! + +šŸ“¦ Config: {config_name} +šŸ“‚ Saved to: {config_file} +šŸ“ Repository: {git_url} +🌿 Branch: {branch} +šŸ”„ Refreshed: {'Yes (forced)' if force_refresh else 'No (used cache)'} + +Next steps: + 1. Review config: cat {config_file} + 2. Estimate pages: Use estimate_pages tool + 3. Scrape docs: Use scrape_docs tool + +šŸ’” Register this source: Use add_config_source to save for future use +""" + return [TextContent(type="text", text=result)] + + # MODE 3: API (existing, backward compatible) + else: + API_BASE_URL = "https://api.skillseekersweb.com" + + async with httpx.AsyncClient(timeout=30.0) as client: + # List available configs if requested or no config_name provided + if list_available or not config_name: + # Build API URL with optional category filter + list_url = f"{API_BASE_URL}/api/configs" + params = {} + if category: + params["category"] = category + + response = await client.get(list_url, params=params) + response.raise_for_status() + data = response.json() + + configs = data.get("configs", []) + total = data.get("total", 0) + filters = data.get("filters") + + # Format list output + result = f"šŸ“‹ Available Configs ({total} total)\n" + if filters: + result += f"šŸ” Filters: {filters}\n" + result += "\n" + + # Group by category + by_category = {} + for config in configs: + cat = config.get("category", "uncategorized") + if cat not in by_category: + by_category[cat] = [] + by_category[cat].append(config) + + for cat, cat_configs in sorted(by_category.items()): + result += f"\n**{cat.upper()}** ({len(cat_configs)} configs):\n" + for cfg in cat_configs: + name = cfg.get("name") + desc = cfg.get("description", "")[:60] + config_type = cfg.get("type", "unknown") + tags = ", ".join(cfg.get("tags", [])[:3]) + result += f" • {name} [{config_type}] - {desc}{'...' if len(cfg.get('description', '')) > 60 else ''}\n" + if tags: + result += f" Tags: {tags}\n" + + result += f"\nšŸ’” To download a config, use: fetch_config with config_name=''\n" + result += f"šŸ“š API Docs: {API_BASE_URL}/docs\n" + + return [TextContent(type="text", text=result)] + + # Download specific config + if not config_name: + return [TextContent(type="text", text="āŒ Error: Please provide config_name or set list_available=true")] + + # Get config details first + detail_url = f"{API_BASE_URL}/api/configs/{config_name}" + detail_response = await client.get(detail_url) + + if detail_response.status_code == 404: + return [TextContent(type="text", text=f"āŒ Config '{config_name}' not found. Use list_available=true to see available configs.")] + + detail_response.raise_for_status() + config_info = detail_response.json() + + # Download the actual config file + download_url = f"{API_BASE_URL}/api/download/{config_name}.json" + download_response = await client.get(download_url) + download_response.raise_for_status() + config_data = download_response.json() + + # Save to destination + dest_path = Path(destination) + dest_path.mkdir(parents=True, exist_ok=True) + config_file = dest_path / f"{config_name}.json" + + with open(config_file, 'w') as f: + json.dump(config_data, f, indent=2) + + # Build result message + result = f"""āœ… Config downloaded successfully! šŸ“¦ Config: {config_name} šŸ“‚ Saved to: {config_file} @@ -1219,7 +1452,7 @@ Next steps: šŸ’” More configs: Use list_available=true to see all available configs """ - return [TextContent(type="text", text=result)] + return [TextContent(type="text", text=result)] except httpx.HTTPError as e: return [TextContent(type="text", text=f"āŒ HTTP Error: {str(e)}\n\nCheck your internet connection or try again later.")] @@ -1432,6 +1665,176 @@ What happens next: return [TextContent(type="text", text=f"āŒ Error: {str(e)}")] +async def add_config_source_tool(args: dict) -> list[TextContent]: + """Register a git repository as a config source""" + from skill_seekers.mcp.source_manager import SourceManager + + name = args.get("name") + git_url = args.get("git_url") + source_type = args.get("source_type", "github") + token_env = args.get("token_env") + branch = args.get("branch", "main") + priority = args.get("priority", 100) + enabled = args.get("enabled", True) + + try: + # Validate required parameters + if not name: + return [TextContent(type="text", text="āŒ Error: 'name' parameter is required")] + if not git_url: + return [TextContent(type="text", text="āŒ Error: 'git_url' parameter is required")] + + # Add source + source_manager = SourceManager() + source = source_manager.add_source( + name=name, + git_url=git_url, + source_type=source_type, + token_env=token_env, + branch=branch, + priority=priority, + enabled=enabled + ) + + # Check if this is an update + is_update = "updated_at" in source and source["added_at"] != source["updated_at"] + + result = f"""āœ… Config source {'updated' if is_update else 'registered'} successfully! + +šŸ“› Name: {source['name']} +šŸ“ Repository: {source['git_url']} +šŸ”– Type: {source['type']} +🌿 Branch: {source['branch']} +šŸ”‘ Token env: {source.get('token_env', 'None')} +⚔ Priority: {source['priority']} (lower = higher priority) +āœ“ Enabled: {source['enabled']} +šŸ•’ Added: {source['added_at'][:19]} + +Usage: + # Fetch config from this source + fetch_config(source="{source['name']}", config_name="your-config") + + # List all sources + list_config_sources() + + # Remove this source + remove_config_source(name="{source['name']}") + +šŸ’” Make sure to set {source.get('token_env', 'GIT_TOKEN')} environment variable for private repos +""" + + return [TextContent(type="text", text=result)] + + except ValueError as e: + return [TextContent(type="text", text=f"āŒ Validation Error: {str(e)}")] + except Exception as e: + return [TextContent(type="text", text=f"āŒ Error: {str(e)}")] + + +async def list_config_sources_tool(args: dict) -> list[TextContent]: + """List all registered config sources""" + from skill_seekers.mcp.source_manager import SourceManager + + enabled_only = args.get("enabled_only", False) + + try: + source_manager = SourceManager() + sources = source_manager.list_sources(enabled_only=enabled_only) + + if not sources: + result = """šŸ“‹ No config sources registered + +To add a source: + add_config_source( + name="team", + git_url="https://github.com/myorg/configs.git" + ) + +šŸ’” Once added, use: fetch_config(source="team", config_name="...") +""" + return [TextContent(type="text", text=result)] + + # Format sources list + result = f"šŸ“‹ Config Sources ({len(sources)} total" + if enabled_only: + result += ", enabled only" + result += ")\n\n" + + for source in sources: + status_icon = "āœ“" if source.get("enabled", True) else "āœ—" + result += f"{status_icon} **{source['name']}**\n" + result += f" šŸ“ {source['git_url']}\n" + result += f" šŸ”– Type: {source['type']} | 🌿 Branch: {source['branch']}\n" + result += f" šŸ”‘ Token: {source.get('token_env', 'None')} | ⚔ Priority: {source['priority']}\n" + result += f" šŸ•’ Added: {source['added_at'][:19]}\n" + result += "\n" + + result += """Usage: + # Fetch config from a source + fetch_config(source="SOURCE_NAME", config_name="CONFIG_NAME") + + # Add new source + add_config_source(name="...", git_url="...") + + # Remove source + remove_config_source(name="SOURCE_NAME") +""" + + return [TextContent(type="text", text=result)] + + except Exception as e: + return [TextContent(type="text", text=f"āŒ Error: {str(e)}")] + + +async def remove_config_source_tool(args: dict) -> list[TextContent]: + """Remove a registered config source""" + from skill_seekers.mcp.source_manager import SourceManager + + name = args.get("name") + + try: + # Validate required parameter + if not name: + return [TextContent(type="text", text="āŒ Error: 'name' parameter is required")] + + # Remove source + source_manager = SourceManager() + removed = source_manager.remove_source(name) + + if removed: + result = f"""āœ… Config source removed successfully! + +šŸ“› Removed: {name} + +āš ļø Note: Cached git repository data is NOT deleted +To free up disk space, manually delete: ~/.skill-seekers/cache/{name}/ + +Next steps: + # List remaining sources + list_config_sources() + + # Add a different source + add_config_source(name="...", git_url="...") +""" + return [TextContent(type="text", text=result)] + else: + # Not found - show available sources + sources = source_manager.list_sources() + available = [s["name"] for s in sources] + + result = f"""āŒ Source '{name}' not found + +Available sources: {', '.join(available) if available else 'none'} + +To see all sources: + list_config_sources() +""" + return [TextContent(type="text", text=result)] + + except Exception as e: + return [TextContent(type="text", text=f"āŒ Error: {str(e)}")] + + async def main(): """Run the MCP server""" if not MCP_AVAILABLE or app is None: diff --git a/src/skill_seekers/mcp/source_manager.py b/src/skill_seekers/mcp/source_manager.py new file mode 100644 index 0000000..35cf698 --- /dev/null +++ b/src/skill_seekers/mcp/source_manager.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python3 +""" +Config Source Manager +Manages registry of custom config sources (git repositories) +""" + +import json +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + + +class SourceManager: + """Manages config source registry at ~/.skill-seekers/sources.json""" + + def __init__(self, config_dir: Optional[str] = None): + """ + Initialize source manager. + + Args: + config_dir: Base config directory. Defaults to ~/.skill-seekers/ + """ + if config_dir: + self.config_dir = Path(config_dir) + else: + self.config_dir = Path.home() / ".skill-seekers" + + # Ensure config directory exists + self.config_dir.mkdir(parents=True, exist_ok=True) + + # Registry file path + self.registry_file = self.config_dir / "sources.json" + + # Initialize registry if it doesn't exist + if not self.registry_file.exists(): + self._write_registry({"version": "1.0", "sources": []}) + + def add_source( + self, + name: str, + git_url: str, + source_type: str = "github", + token_env: Optional[str] = None, + branch: str = "main", + priority: int = 100, + enabled: bool = True + ) -> dict: + """ + Add or update a config source. + + Args: + name: Source identifier (lowercase, alphanumeric + hyphens/underscores) + git_url: Git repository URL + source_type: Source type (github, gitlab, bitbucket, custom) + token_env: Environment variable name for auth token + branch: Git branch to use (default: main) + priority: Source priority (lower = higher priority, default: 100) + enabled: Whether source is enabled (default: True) + + Returns: + Source dictionary + + Raises: + ValueError: If name is invalid or git_url is empty + """ + # Validate name + if not name or not name.replace("-", "").replace("_", "").isalnum(): + raise ValueError( + f"Invalid source name '{name}'. " + "Must be alphanumeric with optional hyphens/underscores." + ) + + # Validate git_url + if not git_url or not git_url.strip(): + raise ValueError("git_url cannot be empty") + + # Auto-detect token_env if not provided + if token_env is None: + token_env = self._default_token_env(source_type) + + # Create source entry + source = { + "name": name.lower(), + "git_url": git_url.strip(), + "type": source_type.lower(), + "token_env": token_env, + "branch": branch, + "enabled": enabled, + "priority": priority, + "added_at": datetime.now(timezone.utc).isoformat(), + "updated_at": datetime.now(timezone.utc).isoformat() + } + + # Load registry + registry = self._read_registry() + + # Check if source exists + existing_index = None + for i, existing_source in enumerate(registry["sources"]): + if existing_source["name"] == source["name"]: + existing_index = i + # Preserve added_at timestamp + source["added_at"] = existing_source.get("added_at", source["added_at"]) + break + + # Add or update + if existing_index is not None: + registry["sources"][existing_index] = source + else: + registry["sources"].append(source) + + # Sort by priority (lower first) + registry["sources"].sort(key=lambda s: s["priority"]) + + # Save registry + self._write_registry(registry) + + return source + + def get_source(self, name: str) -> dict: + """ + Get source by name. + + Args: + name: Source identifier + + Returns: + Source dictionary + + Raises: + KeyError: If source not found + """ + registry = self._read_registry() + + # Search for source (case-insensitive) + name_lower = name.lower() + for source in registry["sources"]: + if source["name"] == name_lower: + return source + + # Not found - provide helpful error + available = [s["name"] for s in registry["sources"]] + raise KeyError( + f"Source '{name}' not found. " + f"Available sources: {', '.join(available) if available else 'none'}" + ) + + def list_sources(self, enabled_only: bool = False) -> list[dict]: + """ + List all config sources. + + Args: + enabled_only: If True, only return enabled sources + + Returns: + List of source dictionaries (sorted by priority) + """ + registry = self._read_registry() + + if enabled_only: + return [s for s in registry["sources"] if s.get("enabled", True)] + + return registry["sources"] + + def remove_source(self, name: str) -> bool: + """ + Remove source by name. + + Args: + name: Source identifier + + Returns: + True if removed, False if not found + """ + registry = self._read_registry() + + # Find source index + name_lower = name.lower() + for i, source in enumerate(registry["sources"]): + if source["name"] == name_lower: + # Remove source + del registry["sources"][i] + # Save registry + self._write_registry(registry) + return True + + return False + + def update_source( + self, + name: str, + **kwargs + ) -> dict: + """ + Update specific fields of an existing source. + + Args: + name: Source identifier + **kwargs: Fields to update (git_url, branch, enabled, priority, etc.) + + Returns: + Updated source dictionary + + Raises: + KeyError: If source not found + """ + # Get existing source + source = self.get_source(name) + + # Update allowed fields + allowed_fields = {"git_url", "type", "token_env", "branch", "enabled", "priority"} + for field, value in kwargs.items(): + if field in allowed_fields: + source[field] = value + + # Update timestamp + source["updated_at"] = datetime.now(timezone.utc).isoformat() + + # Save changes + registry = self._read_registry() + for i, s in enumerate(registry["sources"]): + if s["name"] == source["name"]: + registry["sources"][i] = source + break + + # Re-sort by priority + registry["sources"].sort(key=lambda s: s["priority"]) + + self._write_registry(registry) + + return source + + def _read_registry(self) -> dict: + """ + Read registry from file. + + Returns: + Registry dictionary + """ + try: + with open(self.registry_file, 'r', encoding='utf-8') as f: + return json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Corrupted registry file: {e}") from e + + def _write_registry(self, registry: dict) -> None: + """ + Write registry to file atomically. + + Args: + registry: Registry dictionary + """ + # Validate schema + if "version" not in registry or "sources" not in registry: + raise ValueError("Invalid registry schema") + + # Atomic write: write to temp file, then rename + temp_file = self.registry_file.with_suffix(".tmp") + + try: + with open(temp_file, 'w', encoding='utf-8') as f: + json.dump(registry, f, indent=2, ensure_ascii=False) + + # Atomic rename + temp_file.replace(self.registry_file) + + except Exception as e: + # Clean up temp file on error + if temp_file.exists(): + temp_file.unlink() + raise e + + @staticmethod + def _default_token_env(source_type: str) -> str: + """ + Get default token environment variable name for source type. + + Args: + source_type: Source type (github, gitlab, bitbucket, custom) + + Returns: + Environment variable name (e.g., GITHUB_TOKEN) + """ + type_map = { + "github": "GITHUB_TOKEN", + "gitlab": "GITLAB_TOKEN", + "gitea": "GITEA_TOKEN", + "bitbucket": "BITBUCKET_TOKEN", + "custom": "GIT_TOKEN" + } + + return type_map.get(source_type.lower(), "GIT_TOKEN") diff --git a/tests/test_git_repo.py b/tests/test_git_repo.py new file mode 100644 index 0000000..1d39ae7 --- /dev/null +++ b/tests/test_git_repo.py @@ -0,0 +1,429 @@ +#!/usr/bin/env python3 +""" +Tests for GitConfigRepo class (git repository operations) +""" + +import json +import pytest +import shutil +from pathlib import Path +from unittest.mock import MagicMock, patch, Mock +from git.exc import GitCommandError, InvalidGitRepositoryError + +from skill_seekers.mcp.git_repo import GitConfigRepo + + +@pytest.fixture +def temp_cache_dir(tmp_path): + """Create temporary cache directory for tests.""" + cache_dir = tmp_path / "test_cache" + cache_dir.mkdir() + return cache_dir + + +@pytest.fixture +def git_repo(temp_cache_dir): + """Create GitConfigRepo instance with temp cache.""" + return GitConfigRepo(cache_dir=str(temp_cache_dir)) + + +class TestGitConfigRepoInit: + """Test GitConfigRepo initialization.""" + + def test_init_with_custom_cache_dir(self, temp_cache_dir): + """Test initialization with custom cache directory.""" + repo = GitConfigRepo(cache_dir=str(temp_cache_dir)) + assert repo.cache_dir == temp_cache_dir + assert temp_cache_dir.exists() + + def test_init_with_env_var(self, tmp_path, monkeypatch): + """Test initialization with environment variable.""" + env_cache = tmp_path / "env_cache" + monkeypatch.setenv("SKILL_SEEKERS_CACHE_DIR", str(env_cache)) + + repo = GitConfigRepo() + assert repo.cache_dir == env_cache + assert env_cache.exists() + + def test_init_with_default(self, monkeypatch): + """Test initialization with default cache directory.""" + monkeypatch.delenv("SKILL_SEEKERS_CACHE_DIR", raising=False) + + repo = GitConfigRepo() + expected = Path.home() / ".skill-seekers" / "cache" + assert repo.cache_dir == expected + + +class TestValidateGitUrl: + """Test git URL validation.""" + + def test_validate_https_url(self): + """Test validation of HTTPS URLs.""" + assert GitConfigRepo.validate_git_url("https://github.com/org/repo.git") + assert GitConfigRepo.validate_git_url("https://gitlab.com/org/repo.git") + + def test_validate_http_url(self): + """Test validation of HTTP URLs.""" + assert GitConfigRepo.validate_git_url("http://example.com/repo.git") + + def test_validate_ssh_url(self): + """Test validation of SSH URLs.""" + assert GitConfigRepo.validate_git_url("git@github.com:org/repo.git") + assert GitConfigRepo.validate_git_url("git@gitlab.com:group/project.git") + + def test_validate_file_url(self): + """Test validation of file:// URLs.""" + assert GitConfigRepo.validate_git_url("file:///path/to/repo.git") + + def test_invalid_empty_url(self): + """Test validation rejects empty URLs.""" + assert not GitConfigRepo.validate_git_url("") + assert not GitConfigRepo.validate_git_url(None) + + def test_invalid_malformed_url(self): + """Test validation rejects malformed URLs.""" + assert not GitConfigRepo.validate_git_url("not-a-url") + assert not GitConfigRepo.validate_git_url("ftp://example.com/repo") + + def test_invalid_ssh_without_colon(self): + """Test validation rejects SSH URLs without colon.""" + assert not GitConfigRepo.validate_git_url("git@github.com/org/repo.git") + + +class TestInjectToken: + """Test token injection into git URLs.""" + + def test_inject_token_https(self): + """Test token injection into HTTPS URL.""" + url = "https://github.com/org/repo.git" + token = "ghp_testtoken123" + + result = GitConfigRepo.inject_token(url, token) + assert result == "https://ghp_testtoken123@github.com/org/repo.git" + + def test_inject_token_ssh_to_https(self): + """Test SSH URL conversion to HTTPS with token.""" + url = "git@github.com:org/repo.git" + token = "ghp_testtoken123" + + result = GitConfigRepo.inject_token(url, token) + assert result == "https://ghp_testtoken123@github.com/org/repo.git" + + def test_inject_token_with_port(self): + """Test token injection with custom port.""" + url = "https://gitlab.example.com:8443/org/repo.git" + token = "token123" + + result = GitConfigRepo.inject_token(url, token) + assert result == "https://token123@gitlab.example.com:8443/org/repo.git" + + def test_inject_token_gitlab_ssh(self): + """Test GitLab SSH URL conversion.""" + url = "git@gitlab.com:group/project.git" + token = "glpat-token123" + + result = GitConfigRepo.inject_token(url, token) + assert result == "https://glpat-token123@gitlab.com/group/project.git" + + +class TestCloneOrPull: + """Test clone and pull operations.""" + + @patch('skill_seekers.mcp.git_repo.git.Repo.clone_from') + def test_clone_new_repo(self, mock_clone, git_repo): + """Test cloning a new repository.""" + mock_clone.return_value = MagicMock() + + result = git_repo.clone_or_pull( + source_name="test-source", + git_url="https://github.com/org/repo.git" + ) + + assert result == git_repo.cache_dir / "test-source" + mock_clone.assert_called_once() + + # Verify shallow clone parameters + call_kwargs = mock_clone.call_args[1] + assert call_kwargs['depth'] == 1 + assert call_kwargs['single_branch'] is True + assert call_kwargs['branch'] == "main" + + @patch('skill_seekers.mcp.git_repo.git.Repo') + def test_pull_existing_repo(self, mock_repo_class, git_repo, temp_cache_dir): + """Test pulling updates to existing repository.""" + # Create fake existing repo + repo_path = temp_cache_dir / "test-source" + repo_path.mkdir() + (repo_path / ".git").mkdir() + + # Mock git.Repo + mock_repo = MagicMock() + mock_origin = MagicMock() + mock_repo.remotes.origin = mock_origin + mock_repo_class.return_value = mock_repo + + result = git_repo.clone_or_pull( + source_name="test-source", + git_url="https://github.com/org/repo.git" + ) + + assert result == repo_path + mock_origin.pull.assert_called_once_with("main") + + @patch('skill_seekers.mcp.git_repo.git.Repo') + def test_pull_with_token_update(self, mock_repo_class, git_repo, temp_cache_dir): + """Test pulling with token updates remote URL.""" + # Create fake existing repo + repo_path = temp_cache_dir / "test-source" + repo_path.mkdir() + (repo_path / ".git").mkdir() + + # Mock git.Repo + mock_repo = MagicMock() + mock_origin = MagicMock() + mock_repo.remotes.origin = mock_origin + mock_repo_class.return_value = mock_repo + + result = git_repo.clone_or_pull( + source_name="test-source", + git_url="https://github.com/org/repo.git", + token="ghp_token123" + ) + + # Verify URL was updated with token + mock_origin.set_url.assert_called_once() + updated_url = mock_origin.set_url.call_args[0][0] + assert "ghp_token123@github.com" in updated_url + + @patch('skill_seekers.mcp.git_repo.git.Repo.clone_from') + def test_force_refresh_deletes_cache(self, mock_clone, git_repo, temp_cache_dir): + """Test force refresh deletes existing cache.""" + # Create fake existing repo + repo_path = temp_cache_dir / "test-source" + repo_path.mkdir() + (repo_path / ".git").mkdir() + (repo_path / "config.json").write_text("{}") + + mock_clone.return_value = MagicMock() + + git_repo.clone_or_pull( + source_name="test-source", + git_url="https://github.com/org/repo.git", + force_refresh=True + ) + + # Verify clone was called (not pull) + mock_clone.assert_called_once() + + @patch('skill_seekers.mcp.git_repo.git.Repo.clone_from') + def test_clone_with_custom_branch(self, mock_clone, git_repo): + """Test cloning with custom branch.""" + mock_clone.return_value = MagicMock() + + git_repo.clone_or_pull( + source_name="test-source", + git_url="https://github.com/org/repo.git", + branch="develop" + ) + + call_kwargs = mock_clone.call_args[1] + assert call_kwargs['branch'] == "develop" + + def test_clone_invalid_url_raises_error(self, git_repo): + """Test cloning with invalid URL raises ValueError.""" + with pytest.raises(ValueError, match="Invalid git URL"): + git_repo.clone_or_pull( + source_name="test-source", + git_url="not-a-valid-url" + ) + + @patch('skill_seekers.mcp.git_repo.git.Repo.clone_from') + def test_clone_auth_failure_error(self, mock_clone, git_repo): + """Test authentication failure error handling.""" + mock_clone.side_effect = GitCommandError( + "clone", + 128, + stderr="fatal: Authentication failed" + ) + + with pytest.raises(GitCommandError, match="Authentication failed"): + git_repo.clone_or_pull( + source_name="test-source", + git_url="https://github.com/org/repo.git" + ) + + @patch('skill_seekers.mcp.git_repo.git.Repo.clone_from') + def test_clone_not_found_error(self, mock_clone, git_repo): + """Test repository not found error handling.""" + mock_clone.side_effect = GitCommandError( + "clone", + 128, + stderr="fatal: repository not found" + ) + + with pytest.raises(GitCommandError, match="Repository not found"): + git_repo.clone_or_pull( + source_name="test-source", + git_url="https://github.com/org/nonexistent.git" + ) + + +class TestFindConfigs: + """Test config file discovery.""" + + def test_find_configs_in_root(self, git_repo, temp_cache_dir): + """Test finding config files in repository root.""" + repo_path = temp_cache_dir / "test-repo" + repo_path.mkdir() + + (repo_path / "config1.json").write_text("{}") + (repo_path / "config2.json").write_text("{}") + (repo_path / "README.md").write_text("# Readme") + + configs = git_repo.find_configs(repo_path) + + assert len(configs) == 2 + assert all(c.suffix == ".json" for c in configs) + assert sorted([c.name for c in configs]) == ["config1.json", "config2.json"] + + def test_find_configs_in_subdirs(self, git_repo, temp_cache_dir): + """Test finding config files in subdirectories.""" + repo_path = temp_cache_dir / "test-repo" + configs_dir = repo_path / "configs" + configs_dir.mkdir(parents=True) + + (repo_path / "root.json").write_text("{}") + (configs_dir / "sub1.json").write_text("{}") + (configs_dir / "sub2.json").write_text("{}") + + configs = git_repo.find_configs(repo_path) + + assert len(configs) == 3 + + def test_find_configs_excludes_git_dir(self, git_repo, temp_cache_dir): + """Test that .git directory is excluded from config search.""" + repo_path = temp_cache_dir / "test-repo" + git_dir = repo_path / ".git" / "config" + git_dir.mkdir(parents=True) + + (repo_path / "config.json").write_text("{}") + (git_dir / "internal.json").write_text("{}") + + configs = git_repo.find_configs(repo_path) + + assert len(configs) == 1 + assert configs[0].name == "config.json" + + def test_find_configs_empty_repo(self, git_repo, temp_cache_dir): + """Test finding configs in empty repository.""" + repo_path = temp_cache_dir / "empty-repo" + repo_path.mkdir() + + configs = git_repo.find_configs(repo_path) + + assert configs == [] + + def test_find_configs_nonexistent_repo(self, git_repo, temp_cache_dir): + """Test finding configs in non-existent repository.""" + repo_path = temp_cache_dir / "nonexistent" + + configs = git_repo.find_configs(repo_path) + + assert configs == [] + + def test_find_configs_sorted_by_name(self, git_repo, temp_cache_dir): + """Test that configs are sorted by filename.""" + repo_path = temp_cache_dir / "test-repo" + repo_path.mkdir() + + (repo_path / "zebra.json").write_text("{}") + (repo_path / "alpha.json").write_text("{}") + (repo_path / "beta.json").write_text("{}") + + configs = git_repo.find_configs(repo_path) + + assert [c.name for c in configs] == ["alpha.json", "beta.json", "zebra.json"] + + +class TestGetConfig: + """Test config file loading.""" + + def test_get_config_exact_match(self, git_repo, temp_cache_dir): + """Test loading config with exact filename match.""" + repo_path = temp_cache_dir / "test-repo" + repo_path.mkdir() + + config_data = {"name": "react", "version": "1.0"} + (repo_path / "react.json").write_text(json.dumps(config_data)) + + result = git_repo.get_config(repo_path, "react") + + assert result == config_data + + def test_get_config_with_json_extension(self, git_repo, temp_cache_dir): + """Test loading config when .json extension is provided.""" + repo_path = temp_cache_dir / "test-repo" + repo_path.mkdir() + + config_data = {"name": "vue"} + (repo_path / "vue.json").write_text(json.dumps(config_data)) + + result = git_repo.get_config(repo_path, "vue.json") + + assert result == config_data + + def test_get_config_case_insensitive(self, git_repo, temp_cache_dir): + """Test loading config with case-insensitive match.""" + repo_path = temp_cache_dir / "test-repo" + repo_path.mkdir() + + config_data = {"name": "Django"} + (repo_path / "Django.json").write_text(json.dumps(config_data)) + + result = git_repo.get_config(repo_path, "django") + + assert result == config_data + + def test_get_config_in_subdir(self, git_repo, temp_cache_dir): + """Test loading config from subdirectory.""" + repo_path = temp_cache_dir / "test-repo" + configs_dir = repo_path / "configs" + configs_dir.mkdir(parents=True) + + config_data = {"name": "nestjs"} + (configs_dir / "nestjs.json").write_text(json.dumps(config_data)) + + result = git_repo.get_config(repo_path, "nestjs") + + assert result == config_data + + def test_get_config_not_found(self, git_repo, temp_cache_dir): + """Test error when config not found.""" + repo_path = temp_cache_dir / "test-repo" + repo_path.mkdir() + + (repo_path / "react.json").write_text("{}") + + with pytest.raises(FileNotFoundError, match="Config 'vue.json' not found"): + git_repo.get_config(repo_path, "vue") + + def test_get_config_not_found_shows_available(self, git_repo, temp_cache_dir): + """Test error message shows available configs.""" + repo_path = temp_cache_dir / "test-repo" + repo_path.mkdir() + + (repo_path / "react.json").write_text("{}") + (repo_path / "vue.json").write_text("{}") + + with pytest.raises(FileNotFoundError, match="Available configs: react, vue"): + git_repo.get_config(repo_path, "django") + + def test_get_config_invalid_json(self, git_repo, temp_cache_dir): + """Test error handling for invalid JSON.""" + repo_path = temp_cache_dir / "test-repo" + repo_path.mkdir() + + (repo_path / "broken.json").write_text("{ invalid json }") + + with pytest.raises(ValueError, match="Invalid JSON"): + git_repo.get_config(repo_path, "broken") diff --git a/tests/test_mcp_git_sources.py b/tests/test_mcp_git_sources.py new file mode 100644 index 0000000..7853707 --- /dev/null +++ b/tests/test_mcp_git_sources.py @@ -0,0 +1,584 @@ +#!/usr/bin/env python3 +""" +MCP Integration Tests for Git Config Sources +Tests the complete MCP tool workflow for git-based config fetching +""" + +import json +import pytest +import os +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch, Mock +from mcp.types import TextContent + +# Test if MCP is available +try: + import mcp + MCP_AVAILABLE = True +except ImportError: + MCP_AVAILABLE = False + + +@pytest.fixture +def temp_dirs(tmp_path): + """Create temporary directories for testing.""" + config_dir = tmp_path / "config" + cache_dir = tmp_path / "cache" + dest_dir = tmp_path / "dest" + + config_dir.mkdir() + cache_dir.mkdir() + dest_dir.mkdir() + + return { + "config": config_dir, + "cache": cache_dir, + "dest": dest_dir + } + + +@pytest.fixture +def mock_git_repo(temp_dirs): + """Create a mock git repository with config files.""" + repo_path = temp_dirs["cache"] / "test-source" + repo_path.mkdir() + (repo_path / ".git").mkdir() + + # Create sample config files + react_config = { + "name": "react", + "description": "React framework", + "base_url": "https://react.dev/" + } + (repo_path / "react.json").write_text(json.dumps(react_config, indent=2)) + + vue_config = { + "name": "vue", + "description": "Vue framework", + "base_url": "https://vuejs.org/" + } + (repo_path / "vue.json").write_text(json.dumps(vue_config, indent=2)) + + return repo_path + + +@pytest.mark.skipif(not MCP_AVAILABLE, reason="MCP not available") +@pytest.mark.asyncio +class TestFetchConfigModes: + """Test fetch_config tool with different modes.""" + + async def test_fetch_config_api_mode_list(self): + """Test API mode - listing available configs.""" + from skill_seekers.mcp.server import fetch_config_tool + + with patch('skill_seekers.mcp.server.httpx.AsyncClient') as mock_client: + # Mock API response + mock_response = MagicMock() + mock_response.json.return_value = { + "configs": [ + {"name": "react", "category": "web-frameworks", "description": "React framework", "type": "single"}, + {"name": "vue", "category": "web-frameworks", "description": "Vue framework", "type": "single"} + ], + "total": 2 + } + mock_client.return_value.__aenter__.return_value.get.return_value = mock_response + + args = {"list_available": True} + result = await fetch_config_tool(args) + + assert len(result) == 1 + assert isinstance(result[0], TextContent) + assert "react" in result[0].text + assert "vue" in result[0].text + + async def test_fetch_config_api_mode_download(self, temp_dirs): + """Test API mode - downloading specific config.""" + from skill_seekers.mcp.server import fetch_config_tool + + with patch('skill_seekers.mcp.server.httpx.AsyncClient') as mock_client: + # Mock API responses + mock_detail_response = MagicMock() + mock_detail_response.json.return_value = { + "name": "react", + "category": "web-frameworks", + "description": "React framework" + } + + mock_download_response = MagicMock() + mock_download_response.json.return_value = { + "name": "react", + "base_url": "https://react.dev/" + } + + mock_client_instance = mock_client.return_value.__aenter__.return_value + mock_client_instance.get.side_effect = [mock_detail_response, mock_download_response] + + args = { + "config_name": "react", + "destination": str(temp_dirs["dest"]) + } + result = await fetch_config_tool(args) + + assert len(result) == 1 + assert "āœ…" in result[0].text + assert "react" in result[0].text + + # Verify file was created + config_file = temp_dirs["dest"] / "react.json" + assert config_file.exists() + + @patch('skill_seekers.mcp.server.GitConfigRepo') + async def test_fetch_config_git_url_mode(self, mock_git_repo_class, temp_dirs): + """Test Git URL mode - direct git clone.""" + from skill_seekers.mcp.server import fetch_config_tool + + # Mock GitConfigRepo + mock_repo_instance = MagicMock() + mock_repo_path = temp_dirs["cache"] / "temp_react" + mock_repo_path.mkdir() + + # Create mock config file + react_config = {"name": "react", "base_url": "https://react.dev/"} + (mock_repo_path / "react.json").write_text(json.dumps(react_config)) + + mock_repo_instance.clone_or_pull.return_value = mock_repo_path + mock_repo_instance.get_config.return_value = react_config + mock_git_repo_class.return_value = mock_repo_instance + + args = { + "config_name": "react", + "git_url": "https://github.com/myorg/configs.git", + "destination": str(temp_dirs["dest"]) + } + result = await fetch_config_tool(args) + + assert len(result) == 1 + assert "āœ…" in result[0].text + assert "git URL" in result[0].text + assert "react" in result[0].text + + # Verify clone was called + mock_repo_instance.clone_or_pull.assert_called_once() + + # Verify file was created + config_file = temp_dirs["dest"] / "react.json" + assert config_file.exists() + + @patch('skill_seekers.mcp.server.GitConfigRepo') + @patch('skill_seekers.mcp.server.SourceManager') + async def test_fetch_config_source_mode(self, mock_source_manager_class, mock_git_repo_class, temp_dirs): + """Test Source mode - using named source from registry.""" + from skill_seekers.mcp.server import fetch_config_tool + + # Mock SourceManager + mock_source_manager = MagicMock() + mock_source_manager.get_source.return_value = { + "name": "team", + "git_url": "https://github.com/myorg/configs.git", + "branch": "main", + "token_env": "GITHUB_TOKEN" + } + mock_source_manager_class.return_value = mock_source_manager + + # Mock GitConfigRepo + mock_repo_instance = MagicMock() + mock_repo_path = temp_dirs["cache"] / "team" + mock_repo_path.mkdir() + + react_config = {"name": "react", "base_url": "https://react.dev/"} + (mock_repo_path / "react.json").write_text(json.dumps(react_config)) + + mock_repo_instance.clone_or_pull.return_value = mock_repo_path + mock_repo_instance.get_config.return_value = react_config + mock_git_repo_class.return_value = mock_repo_instance + + args = { + "config_name": "react", + "source": "team", + "destination": str(temp_dirs["dest"]) + } + result = await fetch_config_tool(args) + + assert len(result) == 1 + assert "āœ…" in result[0].text + assert "git source" in result[0].text + assert "team" in result[0].text + + # Verify source was retrieved + mock_source_manager.get_source.assert_called_once_with("team") + + # Verify file was created + config_file = temp_dirs["dest"] / "react.json" + assert config_file.exists() + + async def test_fetch_config_source_not_found(self): + """Test error when source doesn't exist.""" + from skill_seekers.mcp.server import fetch_config_tool + + with patch('skill_seekers.mcp.server.SourceManager') as mock_sm_class: + mock_sm = MagicMock() + mock_sm.get_source.side_effect = KeyError("Source 'nonexistent' not found") + mock_sm_class.return_value = mock_sm + + args = { + "config_name": "react", + "source": "nonexistent" + } + result = await fetch_config_tool(args) + + assert len(result) == 1 + assert "āŒ" in result[0].text + assert "not found" in result[0].text + + @patch('skill_seekers.mcp.server.GitConfigRepo') + async def test_fetch_config_config_not_found_in_repo(self, mock_git_repo_class, temp_dirs): + """Test error when config doesn't exist in repository.""" + from skill_seekers.mcp.server import fetch_config_tool + + # Mock GitConfigRepo + mock_repo_instance = MagicMock() + mock_repo_path = temp_dirs["cache"] / "temp_django" + mock_repo_path.mkdir() + + mock_repo_instance.clone_or_pull.return_value = mock_repo_path + mock_repo_instance.get_config.side_effect = FileNotFoundError( + "Config 'django' not found in repository. Available configs: react, vue" + ) + mock_git_repo_class.return_value = mock_repo_instance + + args = { + "config_name": "django", + "git_url": "https://github.com/myorg/configs.git" + } + result = await fetch_config_tool(args) + + assert len(result) == 1 + assert "āŒ" in result[0].text + assert "not found" in result[0].text + assert "Available configs" in result[0].text + + @patch('skill_seekers.mcp.server.GitConfigRepo') + async def test_fetch_config_invalid_git_url(self, mock_git_repo_class): + """Test error handling for invalid git URL.""" + from skill_seekers.mcp.server import fetch_config_tool + + # Mock GitConfigRepo to raise ValueError + mock_repo_instance = MagicMock() + mock_repo_instance.clone_or_pull.side_effect = ValueError("Invalid git URL: not-a-url") + mock_git_repo_class.return_value = mock_repo_instance + + args = { + "config_name": "react", + "git_url": "not-a-url" + } + result = await fetch_config_tool(args) + + assert len(result) == 1 + assert "āŒ" in result[0].text + assert "Invalid git URL" in result[0].text + + +@pytest.mark.skipif(not MCP_AVAILABLE, reason="MCP not available") +@pytest.mark.asyncio +class TestSourceManagementTools: + """Test add/list/remove config source tools.""" + + async def test_add_config_source(self, temp_dirs): + """Test adding a new config source.""" + from skill_seekers.mcp.server import add_config_source_tool + + with patch('skill_seekers.mcp.server.SourceManager') as mock_sm_class: + mock_sm = MagicMock() + mock_sm.add_source.return_value = { + "name": "team", + "git_url": "https://github.com/myorg/configs.git", + "type": "github", + "branch": "main", + "token_env": "GITHUB_TOKEN", + "priority": 100, + "enabled": True, + "added_at": "2025-12-21T10:00:00+00:00" + } + mock_sm_class.return_value = mock_sm + + args = { + "name": "team", + "git_url": "https://github.com/myorg/configs.git" + } + result = await add_config_source_tool(args) + + assert len(result) == 1 + assert "āœ…" in result[0].text + assert "team" in result[0].text + assert "registered" in result[0].text + + # Verify add_source was called + mock_sm.add_source.assert_called_once() + + async def test_add_config_source_missing_name(self): + """Test error when name is missing.""" + from skill_seekers.mcp.server import add_config_source_tool + + args = {"git_url": "https://github.com/myorg/configs.git"} + result = await add_config_source_tool(args) + + assert len(result) == 1 + assert "āŒ" in result[0].text + assert "name" in result[0].text.lower() + assert "required" in result[0].text.lower() + + async def test_add_config_source_missing_git_url(self): + """Test error when git_url is missing.""" + from skill_seekers.mcp.server import add_config_source_tool + + args = {"name": "team"} + result = await add_config_source_tool(args) + + assert len(result) == 1 + assert "āŒ" in result[0].text + assert "git_url" in result[0].text.lower() + assert "required" in result[0].text.lower() + + async def test_add_config_source_invalid_name(self): + """Test error when source name is invalid.""" + from skill_seekers.mcp.server import add_config_source_tool + + with patch('skill_seekers.mcp.server.SourceManager') as mock_sm_class: + mock_sm = MagicMock() + mock_sm.add_source.side_effect = ValueError( + "Invalid source name 'team@company'. Must be alphanumeric with optional hyphens/underscores." + ) + mock_sm_class.return_value = mock_sm + + args = { + "name": "team@company", + "git_url": "https://github.com/myorg/configs.git" + } + result = await add_config_source_tool(args) + + assert len(result) == 1 + assert "āŒ" in result[0].text + assert "Validation Error" in result[0].text + + async def test_list_config_sources(self): + """Test listing config sources.""" + from skill_seekers.mcp.server import list_config_sources_tool + + with patch('skill_seekers.mcp.server.SourceManager') as mock_sm_class: + mock_sm = MagicMock() + mock_sm.list_sources.return_value = [ + { + "name": "team", + "git_url": "https://github.com/myorg/configs.git", + "type": "github", + "branch": "main", + "token_env": "GITHUB_TOKEN", + "priority": 1, + "enabled": True, + "added_at": "2025-12-21T10:00:00+00:00" + }, + { + "name": "company", + "git_url": "https://gitlab.company.com/configs.git", + "type": "gitlab", + "branch": "develop", + "token_env": "GITLAB_TOKEN", + "priority": 2, + "enabled": True, + "added_at": "2025-12-21T11:00:00+00:00" + } + ] + mock_sm_class.return_value = mock_sm + + args = {} + result = await list_config_sources_tool(args) + + assert len(result) == 1 + assert "šŸ“‹" in result[0].text + assert "team" in result[0].text + assert "company" in result[0].text + assert "2 total" in result[0].text + + async def test_list_config_sources_empty(self): + """Test listing when no sources registered.""" + from skill_seekers.mcp.server import list_config_sources_tool + + with patch('skill_seekers.mcp.server.SourceManager') as mock_sm_class: + mock_sm = MagicMock() + mock_sm.list_sources.return_value = [] + mock_sm_class.return_value = mock_sm + + args = {} + result = await list_config_sources_tool(args) + + assert len(result) == 1 + assert "No config sources registered" in result[0].text + + async def test_list_config_sources_enabled_only(self): + """Test listing only enabled sources.""" + from skill_seekers.mcp.server import list_config_sources_tool + + with patch('skill_seekers.mcp.server.SourceManager') as mock_sm_class: + mock_sm = MagicMock() + mock_sm.list_sources.return_value = [ + { + "name": "team", + "git_url": "https://github.com/myorg/configs.git", + "type": "github", + "branch": "main", + "token_env": "GITHUB_TOKEN", + "priority": 1, + "enabled": True, + "added_at": "2025-12-21T10:00:00+00:00" + } + ] + mock_sm_class.return_value = mock_sm + + args = {"enabled_only": True} + result = await list_config_sources_tool(args) + + assert len(result) == 1 + assert "enabled only" in result[0].text + + # Verify list_sources was called with correct parameter + mock_sm.list_sources.assert_called_once_with(enabled_only=True) + + async def test_remove_config_source(self): + """Test removing a config source.""" + from skill_seekers.mcp.server import remove_config_source_tool + + with patch('skill_seekers.mcp.server.SourceManager') as mock_sm_class: + mock_sm = MagicMock() + mock_sm.remove_source.return_value = True + mock_sm_class.return_value = mock_sm + + args = {"name": "team"} + result = await remove_config_source_tool(args) + + assert len(result) == 1 + assert "āœ…" in result[0].text + assert "removed" in result[0].text.lower() + assert "team" in result[0].text + + # Verify remove_source was called + mock_sm.remove_source.assert_called_once_with("team") + + async def test_remove_config_source_not_found(self): + """Test removing non-existent source.""" + from skill_seekers.mcp.server import remove_config_source_tool + + with patch('skill_seekers.mcp.server.SourceManager') as mock_sm_class: + mock_sm = MagicMock() + mock_sm.remove_source.return_value = False + mock_sm.list_sources.return_value = [ + {"name": "team", "git_url": "https://example.com/1.git"}, + {"name": "company", "git_url": "https://example.com/2.git"} + ] + mock_sm_class.return_value = mock_sm + + args = {"name": "nonexistent"} + result = await remove_config_source_tool(args) + + assert len(result) == 1 + assert "āŒ" in result[0].text + assert "not found" in result[0].text + assert "Available sources" in result[0].text + + async def test_remove_config_source_missing_name(self): + """Test error when name is missing.""" + from skill_seekers.mcp.server import remove_config_source_tool + + args = {} + result = await remove_config_source_tool(args) + + assert len(result) == 1 + assert "āŒ" in result[0].text + assert "name" in result[0].text.lower() + assert "required" in result[0].text.lower() + + +@pytest.mark.skipif(not MCP_AVAILABLE, reason="MCP not available") +@pytest.mark.asyncio +class TestCompleteWorkflow: + """Test complete workflow of add → fetch → remove.""" + + @patch('skill_seekers.mcp.server.GitConfigRepo') + @patch('skill_seekers.mcp.server.SourceManager') + async def test_add_fetch_remove_workflow(self, mock_sm_class, mock_git_repo_class, temp_dirs): + """Test complete workflow: add source → fetch config → remove source.""" + from skill_seekers.mcp.server import ( + add_config_source_tool, + fetch_config_tool, + list_config_sources_tool, + remove_config_source_tool + ) + + # Step 1: Add source + mock_sm = MagicMock() + mock_sm.add_source.return_value = { + "name": "team", + "git_url": "https://github.com/myorg/configs.git", + "type": "github", + "branch": "main", + "token_env": "GITHUB_TOKEN", + "priority": 100, + "enabled": True, + "added_at": "2025-12-21T10:00:00+00:00" + } + mock_sm_class.return_value = mock_sm + + add_result = await add_config_source_tool({ + "name": "team", + "git_url": "https://github.com/myorg/configs.git" + }) + assert "āœ…" in add_result[0].text + + # Step 2: Fetch config from source + mock_sm.get_source.return_value = { + "name": "team", + "git_url": "https://github.com/myorg/configs.git", + "branch": "main", + "token_env": "GITHUB_TOKEN" + } + + mock_repo = MagicMock() + mock_repo_path = temp_dirs["cache"] / "team" + mock_repo_path.mkdir() + + react_config = {"name": "react", "base_url": "https://react.dev/"} + (mock_repo_path / "react.json").write_text(json.dumps(react_config)) + + mock_repo.clone_or_pull.return_value = mock_repo_path + mock_repo.get_config.return_value = react_config + mock_git_repo_class.return_value = mock_repo + + fetch_result = await fetch_config_tool({ + "config_name": "react", + "source": "team", + "destination": str(temp_dirs["dest"]) + }) + assert "āœ…" in fetch_result[0].text + + # Verify config file created + assert (temp_dirs["dest"] / "react.json").exists() + + # Step 3: List sources + mock_sm.list_sources.return_value = [{ + "name": "team", + "git_url": "https://github.com/myorg/configs.git", + "type": "github", + "branch": "main", + "token_env": "GITHUB_TOKEN", + "priority": 100, + "enabled": True, + "added_at": "2025-12-21T10:00:00+00:00" + }] + + list_result = await list_config_sources_tool({}) + assert "team" in list_result[0].text + + # Step 4: Remove source + mock_sm.remove_source.return_value = True + + remove_result = await remove_config_source_tool({"name": "team"}) + assert "āœ…" in remove_result[0].text diff --git a/tests/test_source_manager.py b/tests/test_source_manager.py new file mode 100644 index 0000000..8fba7ad --- /dev/null +++ b/tests/test_source_manager.py @@ -0,0 +1,551 @@ +#!/usr/bin/env python3 +""" +Tests for SourceManager class (config source registry management) +""" + +import json +import pytest +from pathlib import Path +from datetime import datetime, timezone + +from skill_seekers.mcp.source_manager import SourceManager + + +@pytest.fixture +def temp_config_dir(tmp_path): + """Create temporary config directory for tests.""" + config_dir = tmp_path / "test_config" + config_dir.mkdir() + return config_dir + + +@pytest.fixture +def source_manager(temp_config_dir): + """Create SourceManager instance with temp config.""" + return SourceManager(config_dir=str(temp_config_dir)) + + +class TestSourceManagerInit: + """Test SourceManager initialization.""" + + def test_init_creates_config_dir(self, tmp_path): + """Test that initialization creates config directory.""" + config_dir = tmp_path / "new_config" + manager = SourceManager(config_dir=str(config_dir)) + + assert config_dir.exists() + assert manager.config_dir == config_dir + + def test_init_creates_registry_file(self, temp_config_dir): + """Test that initialization creates registry file.""" + manager = SourceManager(config_dir=str(temp_config_dir)) + registry_file = temp_config_dir / "sources.json" + + assert registry_file.exists() + + # Verify initial structure + with open(registry_file, 'r') as f: + data = json.load(f) + assert data == {"version": "1.0", "sources": []} + + def test_init_preserves_existing_registry(self, temp_config_dir): + """Test that initialization doesn't overwrite existing registry.""" + registry_file = temp_config_dir / "sources.json" + + # Create existing registry + existing_data = { + "version": "1.0", + "sources": [{"name": "test", "git_url": "https://example.com/repo.git"}] + } + with open(registry_file, 'w') as f: + json.dump(existing_data, f) + + # Initialize manager + manager = SourceManager(config_dir=str(temp_config_dir)) + + # Verify data preserved + with open(registry_file, 'r') as f: + data = json.load(f) + assert len(data["sources"]) == 1 + + def test_init_with_default_config_dir(self): + """Test initialization with default config directory.""" + manager = SourceManager() + + expected = Path.home() / ".skill-seekers" + assert manager.config_dir == expected + + +class TestAddSource: + """Test adding config sources.""" + + def test_add_source_minimal(self, source_manager): + """Test adding source with minimal parameters.""" + source = source_manager.add_source( + name="team", + git_url="https://github.com/myorg/configs.git" + ) + + assert source["name"] == "team" + assert source["git_url"] == "https://github.com/myorg/configs.git" + assert source["type"] == "github" + assert source["token_env"] == "GITHUB_TOKEN" + assert source["branch"] == "main" + assert source["enabled"] is True + assert source["priority"] == 100 + assert "added_at" in source + assert "updated_at" in source + + def test_add_source_full_parameters(self, source_manager): + """Test adding source with all parameters.""" + source = source_manager.add_source( + name="company", + git_url="https://gitlab.company.com/platform/configs.git", + source_type="gitlab", + token_env="CUSTOM_TOKEN", + branch="develop", + priority=1, + enabled=False + ) + + assert source["name"] == "company" + assert source["type"] == "gitlab" + assert source["token_env"] == "CUSTOM_TOKEN" + assert source["branch"] == "develop" + assert source["priority"] == 1 + assert source["enabled"] is False + + def test_add_source_normalizes_name(self, source_manager): + """Test that source names are normalized to lowercase.""" + source = source_manager.add_source( + name="MyTeam", + git_url="https://github.com/org/repo.git" + ) + + assert source["name"] == "myteam" + + def test_add_source_invalid_name_empty(self, source_manager): + """Test that empty source names are rejected.""" + with pytest.raises(ValueError, match="Invalid source name"): + source_manager.add_source( + name="", + git_url="https://github.com/org/repo.git" + ) + + def test_add_source_invalid_name_special_chars(self, source_manager): + """Test that source names with special characters are rejected.""" + with pytest.raises(ValueError, match="Invalid source name"): + source_manager.add_source( + name="team@company", + git_url="https://github.com/org/repo.git" + ) + + def test_add_source_valid_name_with_hyphens(self, source_manager): + """Test that source names with hyphens are allowed.""" + source = source_manager.add_source( + name="team-alpha", + git_url="https://github.com/org/repo.git" + ) + + assert source["name"] == "team-alpha" + + def test_add_source_valid_name_with_underscores(self, source_manager): + """Test that source names with underscores are allowed.""" + source = source_manager.add_source( + name="team_alpha", + git_url="https://github.com/org/repo.git" + ) + + assert source["name"] == "team_alpha" + + def test_add_source_empty_git_url(self, source_manager): + """Test that empty git URLs are rejected.""" + with pytest.raises(ValueError, match="git_url cannot be empty"): + source_manager.add_source(name="team", git_url="") + + def test_add_source_strips_git_url(self, source_manager): + """Test that git URLs are stripped of whitespace.""" + source = source_manager.add_source( + name="team", + git_url=" https://github.com/org/repo.git " + ) + + assert source["git_url"] == "https://github.com/org/repo.git" + + def test_add_source_updates_existing(self, source_manager): + """Test that adding existing source updates it.""" + # Add initial source + source1 = source_manager.add_source( + name="team", + git_url="https://github.com/org/repo1.git" + ) + + # Update source + source2 = source_manager.add_source( + name="team", + git_url="https://github.com/org/repo2.git" + ) + + # Verify updated + assert source2["git_url"] == "https://github.com/org/repo2.git" + assert source2["added_at"] == source1["added_at"] # Preserved + assert source2["updated_at"] > source1["added_at"] # Updated + + # Verify only one source exists + sources = source_manager.list_sources() + assert len(sources) == 1 + + def test_add_source_persists_to_file(self, source_manager, temp_config_dir): + """Test that added sources are persisted to file.""" + source_manager.add_source( + name="team", + git_url="https://github.com/org/repo.git" + ) + + # Read file directly + registry_file = temp_config_dir / "sources.json" + with open(registry_file, 'r') as f: + data = json.load(f) + + assert len(data["sources"]) == 1 + assert data["sources"][0]["name"] == "team" + + def test_add_multiple_sources_sorted_by_priority(self, source_manager): + """Test that multiple sources are sorted by priority.""" + source_manager.add_source(name="low", git_url="https://example.com/1.git", priority=100) + source_manager.add_source(name="high", git_url="https://example.com/2.git", priority=1) + source_manager.add_source(name="medium", git_url="https://example.com/3.git", priority=50) + + sources = source_manager.list_sources() + + assert [s["name"] for s in sources] == ["high", "medium", "low"] + assert [s["priority"] for s in sources] == [1, 50, 100] + + +class TestGetSource: + """Test retrieving config sources.""" + + def test_get_source_exact_match(self, source_manager): + """Test getting source with exact name match.""" + source_manager.add_source(name="team", git_url="https://github.com/org/repo.git") + + source = source_manager.get_source("team") + + assert source["name"] == "team" + + def test_get_source_case_insensitive(self, source_manager): + """Test getting source is case-insensitive.""" + source_manager.add_source(name="MyTeam", git_url="https://github.com/org/repo.git") + + source = source_manager.get_source("myteam") + + assert source["name"] == "myteam" + + def test_get_source_not_found(self, source_manager): + """Test error when source not found.""" + with pytest.raises(KeyError, match="Source 'nonexistent' not found"): + source_manager.get_source("nonexistent") + + def test_get_source_not_found_shows_available(self, source_manager): + """Test error message shows available sources.""" + source_manager.add_source(name="team1", git_url="https://example.com/1.git") + source_manager.add_source(name="team2", git_url="https://example.com/2.git") + + with pytest.raises(KeyError, match="Available sources: team1, team2"): + source_manager.get_source("team3") + + def test_get_source_empty_registry(self, source_manager): + """Test error when registry is empty.""" + with pytest.raises(KeyError, match="Available sources: none"): + source_manager.get_source("team") + + +class TestListSources: + """Test listing config sources.""" + + def test_list_sources_empty(self, source_manager): + """Test listing sources when registry is empty.""" + sources = source_manager.list_sources() + + assert sources == [] + + def test_list_sources_multiple(self, source_manager): + """Test listing multiple sources.""" + source_manager.add_source(name="team1", git_url="https://example.com/1.git") + source_manager.add_source(name="team2", git_url="https://example.com/2.git") + source_manager.add_source(name="team3", git_url="https://example.com/3.git") + + sources = source_manager.list_sources() + + assert len(sources) == 3 + + def test_list_sources_sorted_by_priority(self, source_manager): + """Test that sources are sorted by priority.""" + source_manager.add_source(name="low", git_url="https://example.com/1.git", priority=100) + source_manager.add_source(name="high", git_url="https://example.com/2.git", priority=1) + + sources = source_manager.list_sources() + + assert sources[0]["name"] == "high" + assert sources[1]["name"] == "low" + + def test_list_sources_enabled_only(self, source_manager): + """Test listing only enabled sources.""" + source_manager.add_source(name="enabled1", git_url="https://example.com/1.git", enabled=True) + source_manager.add_source(name="disabled", git_url="https://example.com/2.git", enabled=False) + source_manager.add_source(name="enabled2", git_url="https://example.com/3.git", enabled=True) + + sources = source_manager.list_sources(enabled_only=True) + + assert len(sources) == 2 + assert all(s["enabled"] for s in sources) + assert sorted([s["name"] for s in sources]) == ["enabled1", "enabled2"] + + def test_list_sources_all_when_some_disabled(self, source_manager): + """Test listing all sources includes disabled ones.""" + source_manager.add_source(name="enabled", git_url="https://example.com/1.git", enabled=True) + source_manager.add_source(name="disabled", git_url="https://example.com/2.git", enabled=False) + + sources = source_manager.list_sources(enabled_only=False) + + assert len(sources) == 2 + + +class TestRemoveSource: + """Test removing config sources.""" + + def test_remove_source_exists(self, source_manager): + """Test removing existing source.""" + source_manager.add_source(name="team", git_url="https://github.com/org/repo.git") + + result = source_manager.remove_source("team") + + assert result is True + assert len(source_manager.list_sources()) == 0 + + def test_remove_source_case_insensitive(self, source_manager): + """Test removing source is case-insensitive.""" + source_manager.add_source(name="MyTeam", git_url="https://github.com/org/repo.git") + + result = source_manager.remove_source("myteam") + + assert result is True + + def test_remove_source_not_found(self, source_manager): + """Test removing non-existent source returns False.""" + result = source_manager.remove_source("nonexistent") + + assert result is False + + def test_remove_source_persists_to_file(self, source_manager, temp_config_dir): + """Test that source removal is persisted to file.""" + source_manager.add_source(name="team1", git_url="https://example.com/1.git") + source_manager.add_source(name="team2", git_url="https://example.com/2.git") + + source_manager.remove_source("team1") + + # Read file directly + registry_file = temp_config_dir / "sources.json" + with open(registry_file, 'r') as f: + data = json.load(f) + + assert len(data["sources"]) == 1 + assert data["sources"][0]["name"] == "team2" + + def test_remove_source_from_multiple(self, source_manager): + """Test removing one source from multiple.""" + source_manager.add_source(name="team1", git_url="https://example.com/1.git") + source_manager.add_source(name="team2", git_url="https://example.com/2.git") + source_manager.add_source(name="team3", git_url="https://example.com/3.git") + + source_manager.remove_source("team2") + + sources = source_manager.list_sources() + assert len(sources) == 2 + assert sorted([s["name"] for s in sources]) == ["team1", "team3"] + + +class TestUpdateSource: + """Test updating config sources.""" + + def test_update_source_git_url(self, source_manager): + """Test updating source git URL.""" + source_manager.add_source(name="team", git_url="https://github.com/org/repo1.git") + + updated = source_manager.update_source(name="team", git_url="https://github.com/org/repo2.git") + + assert updated["git_url"] == "https://github.com/org/repo2.git" + + def test_update_source_branch(self, source_manager): + """Test updating source branch.""" + source_manager.add_source(name="team", git_url="https://github.com/org/repo.git") + + updated = source_manager.update_source(name="team", branch="develop") + + assert updated["branch"] == "develop" + + def test_update_source_enabled(self, source_manager): + """Test updating source enabled status.""" + source_manager.add_source(name="team", git_url="https://github.com/org/repo.git", enabled=True) + + updated = source_manager.update_source(name="team", enabled=False) + + assert updated["enabled"] is False + + def test_update_source_priority(self, source_manager): + """Test updating source priority.""" + source_manager.add_source(name="team", git_url="https://github.com/org/repo.git", priority=100) + + updated = source_manager.update_source(name="team", priority=1) + + assert updated["priority"] == 1 + + def test_update_source_multiple_fields(self, source_manager): + """Test updating multiple fields at once.""" + source_manager.add_source(name="team", git_url="https://github.com/org/repo.git") + + updated = source_manager.update_source( + name="team", + git_url="https://gitlab.com/org/repo.git", + type="gitlab", + branch="develop", + priority=1 + ) + + assert updated["git_url"] == "https://gitlab.com/org/repo.git" + assert updated["type"] == "gitlab" + assert updated["branch"] == "develop" + assert updated["priority"] == 1 + + def test_update_source_updates_timestamp(self, source_manager): + """Test that update modifies updated_at timestamp.""" + source = source_manager.add_source(name="team", git_url="https://github.com/org/repo.git") + original_updated = source["updated_at"] + + updated = source_manager.update_source(name="team", branch="develop") + + assert updated["updated_at"] > original_updated + + def test_update_source_not_found(self, source_manager): + """Test error when updating non-existent source.""" + with pytest.raises(KeyError, match="Source 'nonexistent' not found"): + source_manager.update_source(name="nonexistent", branch="main") + + def test_update_source_resorts_by_priority(self, source_manager): + """Test that updating priority re-sorts sources.""" + source_manager.add_source(name="team1", git_url="https://example.com/1.git", priority=1) + source_manager.add_source(name="team2", git_url="https://example.com/2.git", priority=2) + + # Change team2 to higher priority + source_manager.update_source(name="team2", priority=0) + + sources = source_manager.list_sources() + assert sources[0]["name"] == "team2" + assert sources[1]["name"] == "team1" + + +class TestDefaultTokenEnv: + """Test default token environment variable detection.""" + + def test_default_token_env_github(self, source_manager): + """Test GitHub sources get GITHUB_TOKEN.""" + source = source_manager.add_source( + name="team", + git_url="https://github.com/org/repo.git", + source_type="github" + ) + + assert source["token_env"] == "GITHUB_TOKEN" + + def test_default_token_env_gitlab(self, source_manager): + """Test GitLab sources get GITLAB_TOKEN.""" + source = source_manager.add_source( + name="team", + git_url="https://gitlab.com/org/repo.git", + source_type="gitlab" + ) + + assert source["token_env"] == "GITLAB_TOKEN" + + def test_default_token_env_gitea(self, source_manager): + """Test Gitea sources get GITEA_TOKEN.""" + source = source_manager.add_source( + name="team", + git_url="https://gitea.example.com/org/repo.git", + source_type="gitea" + ) + + assert source["token_env"] == "GITEA_TOKEN" + + def test_default_token_env_bitbucket(self, source_manager): + """Test Bitbucket sources get BITBUCKET_TOKEN.""" + source = source_manager.add_source( + name="team", + git_url="https://bitbucket.org/org/repo.git", + source_type="bitbucket" + ) + + assert source["token_env"] == "BITBUCKET_TOKEN" + + def test_default_token_env_custom(self, source_manager): + """Test custom sources get GIT_TOKEN.""" + source = source_manager.add_source( + name="team", + git_url="https://git.example.com/org/repo.git", + source_type="custom" + ) + + assert source["token_env"] == "GIT_TOKEN" + + def test_override_token_env(self, source_manager): + """Test that custom token_env overrides default.""" + source = source_manager.add_source( + name="team", + git_url="https://github.com/org/repo.git", + source_type="github", + token_env="MY_CUSTOM_TOKEN" + ) + + assert source["token_env"] == "MY_CUSTOM_TOKEN" + + +class TestRegistryPersistence: + """Test registry file I/O.""" + + def test_registry_atomic_write(self, source_manager, temp_config_dir): + """Test that registry writes are atomic (temp file + rename).""" + source_manager.add_source(name="team", git_url="https://github.com/org/repo.git") + + # Verify no .tmp file left behind + temp_files = list(temp_config_dir.glob("*.tmp")) + assert len(temp_files) == 0 + + def test_registry_json_formatting(self, source_manager, temp_config_dir): + """Test that registry JSON is properly formatted.""" + source_manager.add_source(name="team", git_url="https://github.com/org/repo.git") + + registry_file = temp_config_dir / "sources.json" + content = registry_file.read_text() + + # Verify it's pretty-printed + assert " " in content # Indentation + data = json.loads(content) + assert "version" in data + assert "sources" in data + + def test_registry_corrupted_file(self, temp_config_dir): + """Test error handling for corrupted registry file.""" + registry_file = temp_config_dir / "sources.json" + registry_file.write_text("{ invalid json }") + + # The constructor will fail when trying to read the corrupted file + # during initialization, but it actually creates a new valid registry + # So we need to test reading a corrupted file after construction + manager = SourceManager(config_dir=str(temp_config_dir)) + + # Corrupt the file after initialization + registry_file.write_text("{ invalid json }") + + # Now _read_registry should fail + with pytest.raises(ValueError, match="Corrupted registry file"): + manager._read_registry()