diff --git a/src/skill_seekers/mcp/tools/workflow_tools.py b/src/skill_seekers/mcp/tools/workflow_tools.py index 0c5e7a0..1c86d98 100644 --- a/src/skill_seekers/mcp/tools/workflow_tools.py +++ b/src/skill_seekers/mcp/tools/workflow_tools.py @@ -135,6 +135,10 @@ def get_workflow_tool(args: dict) -> list: name = args.get("name", "").strip() if not name: return [TextContent(type="text", text="Error: 'name' parameter is required.")] + try: + _validate_name(name) + except ValueError as exc: + return [TextContent(type="text", text=f"Error: {exc}")] text = _read_workflow(name) if text is None: diff --git a/tests/test_mcp_workflow_tools.py b/tests/test_mcp_workflow_tools.py index 20ad80f..8c7a4f6 100644 --- a/tests/test_mcp_workflow_tools.py +++ b/tests/test_mcp_workflow_tools.py @@ -527,3 +527,78 @@ class TestWorkflowRoundTrip: # 6. Get after delete — error r = get_workflow_tool({"name": "lifecycle"}) assert "not found" in _text(r).lower() + + +# --------------------------------------------------------------------------- +# Path traversal protection (CWE-22, #325) +# --------------------------------------------------------------------------- + + +class TestPathTraversalProtection: + """Verify all tools reject path traversal attempts in workflow names.""" + + MALICIOUS_NAMES = [ + "../../../etc/passwd", + "..\\..\\windows\\system32\\config", + "../../../../tmp/evil", + "/etc/passwd", + "foo/../../../bar", + "..%2F..%2Fetc%2Fpasswd", # contains .. + ] + + def test_get_workflow_rejects_traversal(self, user_dir, bundled_names_empty): + from skill_seekers.mcp.tools.workflow_tools import get_workflow_tool + + for name in self.MALICIOUS_NAMES: + result = get_workflow_tool({"name": name}) + assert "Error" in _text(result) or "Invalid" in _text(result), ( + f"get_workflow_tool should reject name={name!r}" + ) + + def test_create_workflow_rejects_traversal(self, user_dir, bundled_names_empty): + from skill_seekers.mcp.tools.workflow_tools import create_workflow_tool + + for name in self.MALICIOUS_NAMES: + result = create_workflow_tool({"name": name, "content": VALID_WORKFLOW_YAML}) + assert "Error" in _text(result) or "Invalid" in _text(result), ( + f"create_workflow_tool should reject name={name!r}" + ) + + def test_update_workflow_rejects_traversal(self, user_dir, bundled_names_empty): + from skill_seekers.mcp.tools.workflow_tools import update_workflow_tool + + for name in self.MALICIOUS_NAMES: + result = update_workflow_tool({"name": name, "content": VALID_WORKFLOW_YAML}) + assert "Error" in _text(result) or "Invalid" in _text(result), ( + f"update_workflow_tool should reject name={name!r}" + ) + + def test_delete_workflow_rejects_traversal(self, user_dir, bundled_names_empty): + from skill_seekers.mcp.tools.workflow_tools import delete_workflow_tool + + for name in self.MALICIOUS_NAMES: + result = delete_workflow_tool({"name": name}) + assert "Error" in _text(result) or "Invalid" in _text(result), ( + f"delete_workflow_tool should reject name={name!r}" + ) + + def test_valid_names_still_work(self, user_dir, bundled_names_empty): + from skill_seekers.mcp.tools.workflow_tools import create_workflow_tool, get_workflow_tool + + result = create_workflow_tool({"name": "my-workflow", "content": VALID_WORKFLOW_YAML}) + assert "Error" not in _text(result) + + result = get_workflow_tool({"name": "my-workflow"}) + assert "Error" not in _text(result) + + def test_validate_name_rejects_empty(self): + from skill_seekers.mcp.tools.workflow_tools import _validate_name + + with pytest.raises(ValueError): + _validate_name("") + + with pytest.raises(ValueError): + _validate_name("..") + + with pytest.raises(ValueError): + _validate_name("foo/bar")