fix: add path traversal protection to get_workflow_tool + tests (#325)
PR #326 added _validate_name() to create/update/delete but missed get_workflow_tool, which would raise an unhandled ValueError instead of returning a user-friendly error. Added try/except handling and 6 tests covering all 4 tool functions with malicious names. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -135,6 +135,10 @@ def get_workflow_tool(args: dict) -> list:
|
|||||||
name = args.get("name", "").strip()
|
name = args.get("name", "").strip()
|
||||||
if not name:
|
if not name:
|
||||||
return [TextContent(type="text", text="Error: 'name' parameter is required.")]
|
return [TextContent(type="text", text="Error: 'name' parameter is required.")]
|
||||||
|
try:
|
||||||
|
_validate_name(name)
|
||||||
|
except ValueError as exc:
|
||||||
|
return [TextContent(type="text", text=f"Error: {exc}")]
|
||||||
|
|
||||||
text = _read_workflow(name)
|
text = _read_workflow(name)
|
||||||
if text is None:
|
if text is None:
|
||||||
|
|||||||
@@ -527,3 +527,78 @@ class TestWorkflowRoundTrip:
|
|||||||
# 6. Get after delete — error
|
# 6. Get after delete — error
|
||||||
r = get_workflow_tool({"name": "lifecycle"})
|
r = get_workflow_tool({"name": "lifecycle"})
|
||||||
assert "not found" in _text(r).lower()
|
assert "not found" in _text(r).lower()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Path traversal protection (CWE-22, #325)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestPathTraversalProtection:
|
||||||
|
"""Verify all tools reject path traversal attempts in workflow names."""
|
||||||
|
|
||||||
|
MALICIOUS_NAMES = [
|
||||||
|
"../../../etc/passwd",
|
||||||
|
"..\\..\\windows\\system32\\config",
|
||||||
|
"../../../../tmp/evil",
|
||||||
|
"/etc/passwd",
|
||||||
|
"foo/../../../bar",
|
||||||
|
"..%2F..%2Fetc%2Fpasswd", # contains ..
|
||||||
|
]
|
||||||
|
|
||||||
|
def test_get_workflow_rejects_traversal(self, user_dir, bundled_names_empty):
|
||||||
|
from skill_seekers.mcp.tools.workflow_tools import get_workflow_tool
|
||||||
|
|
||||||
|
for name in self.MALICIOUS_NAMES:
|
||||||
|
result = get_workflow_tool({"name": name})
|
||||||
|
assert "Error" in _text(result) or "Invalid" in _text(result), (
|
||||||
|
f"get_workflow_tool should reject name={name!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_create_workflow_rejects_traversal(self, user_dir, bundled_names_empty):
|
||||||
|
from skill_seekers.mcp.tools.workflow_tools import create_workflow_tool
|
||||||
|
|
||||||
|
for name in self.MALICIOUS_NAMES:
|
||||||
|
result = create_workflow_tool({"name": name, "content": VALID_WORKFLOW_YAML})
|
||||||
|
assert "Error" in _text(result) or "Invalid" in _text(result), (
|
||||||
|
f"create_workflow_tool should reject name={name!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_update_workflow_rejects_traversal(self, user_dir, bundled_names_empty):
|
||||||
|
from skill_seekers.mcp.tools.workflow_tools import update_workflow_tool
|
||||||
|
|
||||||
|
for name in self.MALICIOUS_NAMES:
|
||||||
|
result = update_workflow_tool({"name": name, "content": VALID_WORKFLOW_YAML})
|
||||||
|
assert "Error" in _text(result) or "Invalid" in _text(result), (
|
||||||
|
f"update_workflow_tool should reject name={name!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_delete_workflow_rejects_traversal(self, user_dir, bundled_names_empty):
|
||||||
|
from skill_seekers.mcp.tools.workflow_tools import delete_workflow_tool
|
||||||
|
|
||||||
|
for name in self.MALICIOUS_NAMES:
|
||||||
|
result = delete_workflow_tool({"name": name})
|
||||||
|
assert "Error" in _text(result) or "Invalid" in _text(result), (
|
||||||
|
f"delete_workflow_tool should reject name={name!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_valid_names_still_work(self, user_dir, bundled_names_empty):
|
||||||
|
from skill_seekers.mcp.tools.workflow_tools import create_workflow_tool, get_workflow_tool
|
||||||
|
|
||||||
|
result = create_workflow_tool({"name": "my-workflow", "content": VALID_WORKFLOW_YAML})
|
||||||
|
assert "Error" not in _text(result)
|
||||||
|
|
||||||
|
result = get_workflow_tool({"name": "my-workflow"})
|
||||||
|
assert "Error" not in _text(result)
|
||||||
|
|
||||||
|
def test_validate_name_rejects_empty(self):
|
||||||
|
from skill_seekers.mcp.tools.workflow_tools import _validate_name
|
||||||
|
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
_validate_name("")
|
||||||
|
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
_validate_name("..")
|
||||||
|
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
_validate_name("foo/bar")
|
||||||
|
|||||||
Reference in New Issue
Block a user