diff --git a/src/skill_seekers/mcp/tools/workflow_tools.py b/src/skill_seekers/mcp/tools/workflow_tools.py index 56d31b4..0c5e7a0 100644 --- a/src/skill_seekers/mcp/tools/workflow_tools.py +++ b/src/skill_seekers/mcp/tools/workflow_tools.py @@ -11,6 +11,7 @@ MCP Tool Implementations for Workflow Management from __future__ import annotations +import os from pathlib import Path import yaml @@ -28,6 +29,13 @@ except ImportError: USER_WORKFLOWS_DIR = Path.home() / ".config" / "skill-seekers" / "workflows" +def _validate_name(name: str) -> str: + """Validate workflow name to prevent path traversal (CWE-22).""" + if not name or ".." in name or "/" in name or "\\" in name or os.path.isabs(name): + raise ValueError(f"Invalid workflow name: {name!r}") + return name + + def _ensure_user_dir() -> Path: USER_WORKFLOWS_DIR.mkdir(parents=True, exist_ok=True) return USER_WORKFLOWS_DIR @@ -55,6 +63,7 @@ def _user_names() -> list[str]: def _read_bundled(name: str) -> str | None: + _validate_name(name) from importlib.resources import files as importlib_files for suffix in (".yaml", ".yml"): @@ -68,6 +77,7 @@ def _read_bundled(name: str) -> str | None: def _read_workflow(name: str) -> str | None: """Read YAML text: user dir first, then bundled.""" + _validate_name(name) for suffix in (".yaml", ".yml"): p = USER_WORKFLOWS_DIR / (name + suffix) if p.exists(): @@ -147,6 +157,10 @@ def create_workflow_tool(args: dict) -> list: if not name: return [TextContent(type="text", text="Error: 'name' parameter is required.")] + try: + _validate_name(name) + except ValueError as exc: + return [TextContent(type="text", text=f"Error: {exc}")] if not content: return [TextContent(type="text", text="Error: 'content' parameter is required.")] @@ -176,6 +190,10 @@ def update_workflow_tool(args: dict) -> list: if not name: return [TextContent(type="text", text="Error: 'name' parameter is required.")] + try: + _validate_name(name) + except ValueError as exc: + return [TextContent(type="text", text=f"Error: {exc}")] if not content: return [TextContent(type="text", text="Error: 'content' parameter is required.")] @@ -207,6 +225,10 @@ def delete_workflow_tool(args: dict) -> list: name = args.get("name", "").strip() if not name: return [TextContent(type="text", text="Error: 'name' parameter is required.")] + try: + _validate_name(name) + except ValueError as exc: + return [TextContent(type="text", text=f"Error: {exc}")] if name in _bundled_names(): return [