feat: enhancement workflow preset system with multi-target CLI
- Add YAML-based enhancement workflow presets shipped inside the package (default, minimal, security-focus, architecture-comprehensive, api-documentation) - Add `skill-seekers workflows` subcommand: list, show, copy, add, remove, validate - copy/add/remove all accept multiple names/files in one invocation with partial-failure behaviour - `add --name` override restricted to single-file operations - Add 5 MCP tools: list_workflows, get_workflow, create_workflow, update_workflow, delete_workflow - Fix: create command _add_common_args() now correctly forwards each --enhance-workflow as a separate flag instead of passing the whole list as a single argument - Update README: reposition as "data layer for AI systems" with AI Skills front and centre - Update CHANGELOG, QUICK_REFERENCE, CLAUDE.md with workflow preset details - 1,880+ tests passing Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -23,19 +23,20 @@ class TestParserRegistry:
|
||||
"""Test parser registry functionality."""
|
||||
|
||||
def test_all_parsers_registered(self):
|
||||
"""Test that all 19 parsers are registered."""
|
||||
assert len(PARSERS) == 20, f"Expected 19 parsers, got {len(PARSERS)}"
|
||||
"""Test that all parsers are registered."""
|
||||
assert len(PARSERS) == 21, f"Expected 21 parsers, got {len(PARSERS)}"
|
||||
|
||||
def test_get_parser_names(self):
|
||||
"""Test getting list of parser names."""
|
||||
names = get_parser_names()
|
||||
assert len(names) == 20
|
||||
assert len(names) == 21
|
||||
assert "scrape" in names
|
||||
assert "github" in names
|
||||
assert "package" in names
|
||||
assert "upload" in names
|
||||
assert "analyze" in names
|
||||
assert "config" in names
|
||||
assert "workflows" in names
|
||||
|
||||
def test_all_parsers_are_subcommand_parsers(self):
|
||||
"""Test that all parsers inherit from SubcommandParser."""
|
||||
@@ -241,9 +242,9 @@ class TestBackwardCompatibility:
|
||||
assert cmd in names, f"Command '{cmd}' not found in parser registry!"
|
||||
|
||||
def test_command_count_matches(self):
|
||||
"""Test that we have exactly 20 commands (includes new create command)."""
|
||||
assert len(PARSERS) == 20
|
||||
assert len(get_parser_names()) == 20
|
||||
"""Test that we have exactly 21 commands (includes new create and workflows commands)."""
|
||||
assert len(PARSERS) == 21
|
||||
assert len(get_parser_names()) == 21
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -117,6 +117,134 @@ class TestCreateCommandBasic:
|
||||
assert "--dry-run" in result.stdout
|
||||
|
||||
|
||||
class TestCreateCommandArgvForwarding:
|
||||
"""Unit tests for _add_common_args argv forwarding."""
|
||||
|
||||
def _make_args(self, **kwargs):
|
||||
import argparse
|
||||
defaults = dict(
|
||||
enhance_workflow=None,
|
||||
enhance_stage=None,
|
||||
var=None,
|
||||
workflow_dry_run=False,
|
||||
enhance_level=0,
|
||||
output=None,
|
||||
name=None,
|
||||
description=None,
|
||||
config=None,
|
||||
api_key=None,
|
||||
dry_run=False,
|
||||
verbose=False,
|
||||
quiet=False,
|
||||
chunk_for_rag=False,
|
||||
chunk_size=512,
|
||||
chunk_overlap=50,
|
||||
preset=None,
|
||||
no_preserve_code_blocks=False,
|
||||
no_preserve_paragraphs=False,
|
||||
interactive_enhancement=False,
|
||||
)
|
||||
defaults.update(kwargs)
|
||||
return argparse.Namespace(**defaults)
|
||||
|
||||
def _collect_argv(self, args):
|
||||
from skill_seekers.cli.create_command import CreateCommand
|
||||
cmd = CreateCommand(args)
|
||||
argv = []
|
||||
cmd._add_common_args(argv)
|
||||
return argv
|
||||
|
||||
def test_single_enhance_workflow_forwarded(self):
|
||||
args = self._make_args(enhance_workflow=["security-focus"])
|
||||
argv = self._collect_argv(args)
|
||||
assert argv.count("--enhance-workflow") == 1
|
||||
assert "security-focus" in argv
|
||||
|
||||
def test_multiple_enhance_workflows_all_forwarded(self):
|
||||
"""Each workflow must appear as a separate --enhance-workflow flag."""
|
||||
args = self._make_args(enhance_workflow=["security-focus", "minimal"])
|
||||
argv = self._collect_argv(args)
|
||||
assert argv.count("--enhance-workflow") == 2
|
||||
idx1 = argv.index("security-focus")
|
||||
idx2 = argv.index("minimal")
|
||||
assert argv[idx1 - 1] == "--enhance-workflow"
|
||||
assert argv[idx2 - 1] == "--enhance-workflow"
|
||||
|
||||
def test_no_enhance_workflow_not_forwarded(self):
|
||||
args = self._make_args(enhance_workflow=None)
|
||||
argv = self._collect_argv(args)
|
||||
assert "--enhance-workflow" not in argv
|
||||
|
||||
# ── enhance_stage ────────────────────────────────────────────────────────
|
||||
|
||||
def test_single_enhance_stage_forwarded(self):
|
||||
args = self._make_args(enhance_stage=["security:Check for vulnerabilities"])
|
||||
argv = self._collect_argv(args)
|
||||
assert "--enhance-stage" in argv
|
||||
assert "security:Check for vulnerabilities" in argv
|
||||
|
||||
def test_multiple_enhance_stages_all_forwarded(self):
|
||||
stages = ["sec:Check security", "cleanup:Remove boilerplate"]
|
||||
args = self._make_args(enhance_stage=stages)
|
||||
argv = self._collect_argv(args)
|
||||
assert argv.count("--enhance-stage") == 2
|
||||
for stage in stages:
|
||||
assert stage in argv
|
||||
|
||||
def test_enhance_stage_none_not_forwarded(self):
|
||||
args = self._make_args(enhance_stage=None)
|
||||
argv = self._collect_argv(args)
|
||||
assert "--enhance-stage" not in argv
|
||||
|
||||
# ── var ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_single_var_forwarded(self):
|
||||
args = self._make_args(var=["depth=comprehensive"])
|
||||
argv = self._collect_argv(args)
|
||||
assert "--var" in argv
|
||||
assert "depth=comprehensive" in argv
|
||||
|
||||
def test_multiple_vars_all_forwarded(self):
|
||||
args = self._make_args(var=["depth=comprehensive", "focus=security"])
|
||||
argv = self._collect_argv(args)
|
||||
assert argv.count("--var") == 2
|
||||
assert "depth=comprehensive" in argv
|
||||
assert "focus=security" in argv
|
||||
|
||||
def test_var_none_not_forwarded(self):
|
||||
args = self._make_args(var=None)
|
||||
argv = self._collect_argv(args)
|
||||
assert "--var" not in argv
|
||||
|
||||
# ── workflow_dry_run ─────────────────────────────────────────────────────
|
||||
|
||||
def test_workflow_dry_run_forwarded(self):
|
||||
args = self._make_args(workflow_dry_run=True)
|
||||
argv = self._collect_argv(args)
|
||||
assert "--workflow-dry-run" in argv
|
||||
|
||||
def test_workflow_dry_run_false_not_forwarded(self):
|
||||
args = self._make_args(workflow_dry_run=False)
|
||||
argv = self._collect_argv(args)
|
||||
assert "--workflow-dry-run" not in argv
|
||||
|
||||
# ── mixed ────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_workflow_and_stage_both_forwarded(self):
|
||||
args = self._make_args(
|
||||
enhance_workflow=["security-focus"],
|
||||
enhance_stage=["cleanup:Remove boilerplate"],
|
||||
var=["depth=basic"],
|
||||
workflow_dry_run=True,
|
||||
)
|
||||
argv = self._collect_argv(args)
|
||||
assert "--enhance-workflow" in argv
|
||||
assert "security-focus" in argv
|
||||
assert "--enhance-stage" in argv
|
||||
assert "--var" in argv
|
||||
assert "--workflow-dry-run" in argv
|
||||
|
||||
|
||||
class TestBackwardCompatibility:
|
||||
"""Test that old commands still work."""
|
||||
|
||||
@@ -165,3 +293,16 @@ class TestBackwardCompatibility:
|
||||
assert "scrape" in result.stdout
|
||||
assert "github" in result.stdout
|
||||
assert "analyze" in result.stdout
|
||||
|
||||
def test_workflows_command_still_works(self):
|
||||
"""The new workflows subcommand is accessible via the main CLI."""
|
||||
import subprocess
|
||||
|
||||
result = subprocess.run(
|
||||
["skill-seekers", "workflows", "--help"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
assert result.returncode == 0
|
||||
assert "workflow" in result.stdout.lower()
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
Covers:
|
||||
- run_workflows() with no workflow flags → (False, [])
|
||||
- run_workflows() with a single named workflow
|
||||
- WorkflowEngine loads bundled presets by name (integration)
|
||||
- run_workflows() with multiple named workflows (chaining)
|
||||
- run_workflows() with inline --enhance-stage flags
|
||||
- run_workflows() with both named and inline workflows
|
||||
@@ -372,3 +373,70 @@ class TestRunWorkflowsDryRun:
|
||||
for engine in engines:
|
||||
engine.preview.assert_called_once()
|
||||
engine.run.assert_not_called()
|
||||
|
||||
|
||||
# ────────────────── bundled preset loading (integration) ─────────────────────
|
||||
|
||||
|
||||
class TestBundledPresetsLoad:
|
||||
"""Verify WorkflowEngine can load each bundled preset by name.
|
||||
|
||||
These are real integration tests – they actually read the YAML files
|
||||
shipped inside the package via importlib.resources.
|
||||
"""
|
||||
|
||||
BUNDLED_NAMES = [
|
||||
"default",
|
||||
"minimal",
|
||||
"security-focus",
|
||||
"architecture-comprehensive",
|
||||
"api-documentation",
|
||||
]
|
||||
|
||||
@pytest.mark.parametrize("preset_name", BUNDLED_NAMES)
|
||||
def test_bundled_preset_loads(self, preset_name):
|
||||
from skill_seekers.cli.enhancement_workflow import WorkflowEngine
|
||||
|
||||
engine = WorkflowEngine(preset_name)
|
||||
wf = engine.workflow
|
||||
assert wf.name, f"Workflow '{preset_name}' has no name"
|
||||
assert isinstance(wf.stages, list), "stages must be a list"
|
||||
assert len(wf.stages) > 0, f"Workflow '{preset_name}' has no stages"
|
||||
|
||||
@pytest.mark.parametrize("preset_name", BUNDLED_NAMES)
|
||||
def test_bundled_preset_stages_have_required_fields(self, preset_name):
|
||||
from skill_seekers.cli.enhancement_workflow import WorkflowEngine
|
||||
|
||||
engine = WorkflowEngine(preset_name)
|
||||
for stage in engine.workflow.stages:
|
||||
assert stage.name, f"Stage in '{preset_name}' has no name"
|
||||
assert stage.type in ("builtin", "custom"), (
|
||||
f"Stage '{stage.name}' in '{preset_name}' has unknown type '{stage.type}'"
|
||||
)
|
||||
|
||||
def test_unknown_preset_raises_file_not_found(self):
|
||||
from skill_seekers.cli.enhancement_workflow import WorkflowEngine
|
||||
|
||||
with pytest.raises(FileNotFoundError):
|
||||
WorkflowEngine("completely-nonexistent-preset-xyz")
|
||||
|
||||
def test_list_bundled_workflows_returns_all(self):
|
||||
from skill_seekers.cli.enhancement_workflow import list_bundled_workflows
|
||||
|
||||
names = list_bundled_workflows()
|
||||
for expected in self.BUNDLED_NAMES:
|
||||
assert expected in names, f"'{expected}' not in bundled workflows: {names}"
|
||||
|
||||
def test_list_user_workflows_empty_when_no_user_dir(self, tmp_path, monkeypatch):
|
||||
"""list_user_workflows returns [] when ~/.config/skill-seekers/workflows/ does not exist."""
|
||||
from skill_seekers.cli import enhancement_workflow as ew_mod
|
||||
import pathlib
|
||||
|
||||
fake_home = tmp_path / "fake_home"
|
||||
fake_home.mkdir()
|
||||
monkeypatch.setenv("HOME", str(fake_home))
|
||||
# Also patch Path.home() used inside the module
|
||||
monkeypatch.setattr(pathlib.Path, "home", staticmethod(lambda: fake_home))
|
||||
|
||||
names = ew_mod.list_user_workflows()
|
||||
assert names == []
|
||||
|
||||
301
tests/test_workflow_tools_mcp.py
Normal file
301
tests/test_workflow_tools_mcp.py
Normal file
@@ -0,0 +1,301 @@
|
||||
"""Tests for the workflow MCP tools.
|
||||
|
||||
Covers:
|
||||
- list_workflows_tool
|
||||
- get_workflow_tool
|
||||
- create_workflow_tool
|
||||
- update_workflow_tool
|
||||
- delete_workflow_tool
|
||||
"""
|
||||
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
|
||||
MINIMAL_YAML = textwrap.dedent("""\
|
||||
name: test-workflow
|
||||
description: A test workflow
|
||||
version: "1.0"
|
||||
applies_to:
|
||||
- codebase_analysis
|
||||
variables: {}
|
||||
stages:
|
||||
- name: step1
|
||||
type: custom
|
||||
target: all
|
||||
uses_history: false
|
||||
enabled: true
|
||||
prompt: "Do something useful."
|
||||
post_process:
|
||||
reorder_sections: []
|
||||
add_metadata: {}
|
||||
""")
|
||||
|
||||
INVALID_YAML_NO_STAGES = textwrap.dedent("""\
|
||||
name: broken
|
||||
description: Missing stages key
|
||||
version: "1.0"
|
||||
""")
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Fixtures & helpers
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_user_dir(tmp_path, monkeypatch):
|
||||
"""Redirect USER_WORKFLOWS_DIR in workflow_tools to a temp dir."""
|
||||
fake_dir = tmp_path / "workflows"
|
||||
fake_dir.mkdir()
|
||||
monkeypatch.setattr(
|
||||
"skill_seekers.mcp.tools.workflow_tools.USER_WORKFLOWS_DIR", fake_dir
|
||||
)
|
||||
return fake_dir
|
||||
|
||||
|
||||
def _mock_bundled_names(names=("default", "security-focus")):
|
||||
return patch(
|
||||
"skill_seekers.mcp.tools.workflow_tools._bundled_names",
|
||||
return_value=list(names),
|
||||
)
|
||||
|
||||
|
||||
def _mock_bundled_text(mapping: dict):
|
||||
def _read(name):
|
||||
return mapping.get(name)
|
||||
return patch(
|
||||
"skill_seekers.mcp.tools.workflow_tools._read_bundled",
|
||||
side_effect=_read,
|
||||
)
|
||||
|
||||
|
||||
def _text(result) -> str:
|
||||
"""Extract text from first TextContent in result."""
|
||||
if isinstance(result, list) and result:
|
||||
item = result[0]
|
||||
return item.text if hasattr(item, "text") else str(item)
|
||||
return str(result)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# list_workflows_tool
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestListWorkflowsTool:
|
||||
def test_lists_bundled_and_user(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import list_workflows_tool
|
||||
|
||||
(tmp_user_dir / "my-workflow.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
|
||||
bundled_map = {"default": MINIMAL_YAML}
|
||||
with _mock_bundled_names(["default"]), _mock_bundled_text(bundled_map):
|
||||
result = list_workflows_tool({})
|
||||
|
||||
text = _text(result)
|
||||
assert "default" in text
|
||||
assert "bundled" in text
|
||||
assert "my-workflow" in text
|
||||
assert "user" in text
|
||||
|
||||
def test_empty_lists(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import list_workflows_tool
|
||||
|
||||
with _mock_bundled_names([]):
|
||||
result = list_workflows_tool({})
|
||||
|
||||
text = _text(result)
|
||||
# Should return a valid (possibly empty) YAML list or empty
|
||||
data = yaml.safe_load(text)
|
||||
assert isinstance(data, (list, type(None)))
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# get_workflow_tool
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestGetWorkflowTool:
|
||||
def test_get_bundled(self):
|
||||
from skill_seekers.mcp.tools.workflow_tools import get_workflow_tool
|
||||
|
||||
with patch(
|
||||
"skill_seekers.mcp.tools.workflow_tools._read_workflow",
|
||||
return_value=MINIMAL_YAML,
|
||||
):
|
||||
result = get_workflow_tool({"name": "default"})
|
||||
|
||||
assert "stages" in _text(result)
|
||||
|
||||
def test_get_not_found(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import get_workflow_tool
|
||||
|
||||
with _mock_bundled_names([]):
|
||||
result = get_workflow_tool({"name": "ghost"})
|
||||
|
||||
text = _text(result)
|
||||
assert "not found" in text.lower() or "Error" in text
|
||||
|
||||
def test_missing_name_param(self):
|
||||
from skill_seekers.mcp.tools.workflow_tools import get_workflow_tool
|
||||
|
||||
result = get_workflow_tool({})
|
||||
assert "required" in _text(result).lower()
|
||||
|
||||
def test_get_user_workflow(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import get_workflow_tool
|
||||
|
||||
(tmp_user_dir / "custom.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
result = get_workflow_tool({"name": "custom"})
|
||||
assert "stages" in _text(result)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# create_workflow_tool
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestCreateWorkflowTool:
|
||||
def test_create_new_workflow(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import create_workflow_tool
|
||||
|
||||
result = create_workflow_tool({"name": "new-wf", "content": MINIMAL_YAML})
|
||||
text = _text(result)
|
||||
assert "Created" in text or "created" in text.lower()
|
||||
assert (tmp_user_dir / "new-wf.yaml").exists()
|
||||
|
||||
def test_create_duplicate_fails(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import create_workflow_tool
|
||||
|
||||
(tmp_user_dir / "existing.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
result = create_workflow_tool({"name": "existing", "content": MINIMAL_YAML})
|
||||
assert "already exists" in _text(result).lower()
|
||||
|
||||
def test_create_invalid_yaml(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import create_workflow_tool
|
||||
|
||||
result = create_workflow_tool(
|
||||
{"name": "bad", "content": INVALID_YAML_NO_STAGES}
|
||||
)
|
||||
assert "invalid" in _text(result).lower() or "stages" in _text(result).lower()
|
||||
|
||||
def test_create_missing_name(self):
|
||||
from skill_seekers.mcp.tools.workflow_tools import create_workflow_tool
|
||||
|
||||
result = create_workflow_tool({"content": MINIMAL_YAML})
|
||||
assert "required" in _text(result).lower()
|
||||
|
||||
def test_create_missing_content(self):
|
||||
from skill_seekers.mcp.tools.workflow_tools import create_workflow_tool
|
||||
|
||||
result = create_workflow_tool({"name": "test"})
|
||||
assert "required" in _text(result).lower()
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# update_workflow_tool
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestUpdateWorkflowTool:
|
||||
def test_update_user_workflow(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import update_workflow_tool
|
||||
|
||||
(tmp_user_dir / "my-wf.yaml").write_text("old content", encoding="utf-8")
|
||||
|
||||
with _mock_bundled_names([]):
|
||||
result = update_workflow_tool(
|
||||
{"name": "my-wf", "content": MINIMAL_YAML}
|
||||
)
|
||||
|
||||
text = _text(result)
|
||||
assert "Updated" in text or "updated" in text.lower()
|
||||
assert (tmp_user_dir / "my-wf.yaml").read_text() == MINIMAL_YAML
|
||||
|
||||
def test_update_bundled_refused(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import update_workflow_tool
|
||||
|
||||
with _mock_bundled_names(["default"]):
|
||||
result = update_workflow_tool(
|
||||
{"name": "default", "content": MINIMAL_YAML}
|
||||
)
|
||||
|
||||
assert "bundled" in _text(result).lower()
|
||||
|
||||
def test_update_invalid_yaml(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import update_workflow_tool
|
||||
|
||||
(tmp_user_dir / "my-wf.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
|
||||
with _mock_bundled_names([]):
|
||||
result = update_workflow_tool(
|
||||
{"name": "my-wf", "content": INVALID_YAML_NO_STAGES}
|
||||
)
|
||||
|
||||
assert "invalid" in _text(result).lower() or "stages" in _text(result).lower()
|
||||
|
||||
def test_update_user_override_of_bundled_name(self, tmp_user_dir):
|
||||
"""A user workflow with same name as bundled should be updatable."""
|
||||
from skill_seekers.mcp.tools.workflow_tools import update_workflow_tool
|
||||
|
||||
(tmp_user_dir / "default.yaml").write_text("old", encoding="utf-8")
|
||||
|
||||
with _mock_bundled_names(["default"]):
|
||||
result = update_workflow_tool(
|
||||
{"name": "default", "content": MINIMAL_YAML}
|
||||
)
|
||||
|
||||
text = _text(result)
|
||||
# User has a file named 'default', so it should succeed
|
||||
assert "Updated" in text or "updated" in text.lower()
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# delete_workflow_tool
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestDeleteWorkflowTool:
|
||||
def test_delete_user_workflow(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import delete_workflow_tool
|
||||
|
||||
wf = tmp_user_dir / "to-delete.yaml"
|
||||
wf.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
|
||||
with _mock_bundled_names([]):
|
||||
result = delete_workflow_tool({"name": "to-delete"})
|
||||
|
||||
assert "Deleted" in _text(result) or "deleted" in _text(result).lower()
|
||||
assert not wf.exists()
|
||||
|
||||
def test_delete_bundled_refused(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import delete_workflow_tool
|
||||
|
||||
with _mock_bundled_names(["default"]):
|
||||
result = delete_workflow_tool({"name": "default"})
|
||||
|
||||
assert "bundled" in _text(result).lower()
|
||||
|
||||
def test_delete_nonexistent(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import delete_workflow_tool
|
||||
|
||||
with _mock_bundled_names([]):
|
||||
result = delete_workflow_tool({"name": "ghost"})
|
||||
|
||||
assert "not found" in _text(result).lower()
|
||||
|
||||
def test_delete_yml_extension(self, tmp_user_dir):
|
||||
from skill_seekers.mcp.tools.workflow_tools import delete_workflow_tool
|
||||
|
||||
wf = tmp_user_dir / "my-wf.yml"
|
||||
wf.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
|
||||
with _mock_bundled_names([]):
|
||||
result = delete_workflow_tool({"name": "my-wf"})
|
||||
|
||||
assert not wf.exists()
|
||||
|
||||
def test_delete_missing_name(self):
|
||||
from skill_seekers.mcp.tools.workflow_tools import delete_workflow_tool
|
||||
|
||||
result = delete_workflow_tool({})
|
||||
assert "required" in _text(result).lower()
|
||||
568
tests/test_workflows_command.py
Normal file
568
tests/test_workflows_command.py
Normal file
@@ -0,0 +1,568 @@
|
||||
"""Tests for the workflows CLI command.
|
||||
|
||||
Covers:
|
||||
- workflows list (bundled + user)
|
||||
- workflows show (found / not-found)
|
||||
- workflows copy (bundled → user dir)
|
||||
- workflows add (install custom YAML)
|
||||
- workflows remove (user dir; refuses bundled)
|
||||
- workflows validate (valid / invalid)
|
||||
"""
|
||||
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
# Import the MODULE object (not just individual symbols) so we can patch it
|
||||
# directly via patch.object(). This survives any sys.modules manipulation by
|
||||
# other tests (e.g. test_swift_detection clears skill_seekers.cli.*), because
|
||||
# we hold a reference to the original module object at collection time.
|
||||
import skill_seekers.cli.workflows_command as _wf_cmd
|
||||
|
||||
cmd_list = _wf_cmd.cmd_list
|
||||
cmd_show = _wf_cmd.cmd_show
|
||||
cmd_copy = _wf_cmd.cmd_copy
|
||||
cmd_add = _wf_cmd.cmd_add
|
||||
cmd_remove = _wf_cmd.cmd_remove
|
||||
cmd_validate = _wf_cmd.cmd_validate
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Fixtures
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
MINIMAL_YAML = textwrap.dedent("""\
|
||||
name: test-workflow
|
||||
description: A test workflow
|
||||
version: "1.0"
|
||||
applies_to:
|
||||
- codebase_analysis
|
||||
variables: {}
|
||||
stages:
|
||||
- name: step1
|
||||
type: custom
|
||||
target: all
|
||||
uses_history: false
|
||||
enabled: true
|
||||
prompt: "Do something useful."
|
||||
post_process:
|
||||
reorder_sections: []
|
||||
add_metadata: {}
|
||||
""")
|
||||
|
||||
INVALID_YAML = "not: a: valid: workflow" # missing 'stages' key
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_user_dir(tmp_path, monkeypatch):
|
||||
"""Redirect USER_WORKFLOWS_DIR to a temp directory.
|
||||
|
||||
Uses patch.object on the captured module reference so the patch is applied
|
||||
to the same module dict that the functions reference via __globals__,
|
||||
regardless of any sys.modules manipulation by other tests.
|
||||
"""
|
||||
fake_dir = tmp_path / "workflows"
|
||||
fake_dir.mkdir()
|
||||
monkeypatch.setattr(_wf_cmd, "USER_WORKFLOWS_DIR", fake_dir)
|
||||
return fake_dir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_yaml_file(tmp_path):
|
||||
"""Write MINIMAL_YAML to a temp file and return its path."""
|
||||
p = tmp_path / "test-workflow.yaml"
|
||||
p.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
return p
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Helpers
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _mock_bundled(names=("default", "minimal", "security-focus")):
|
||||
"""Patch list_bundled_workflows on the captured module object."""
|
||||
return patch.object(_wf_cmd, "list_bundled_workflows", return_value=list(names))
|
||||
|
||||
|
||||
def _mock_bundled_text(name_to_text: dict):
|
||||
"""Patch _bundled_yaml_text on the captured module object."""
|
||||
def _bundled_yaml_text(name):
|
||||
return name_to_text.get(name)
|
||||
return patch.object(_wf_cmd, "_bundled_yaml_text", side_effect=_bundled_yaml_text)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# cmd_list
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestCmdList:
|
||||
def test_shows_bundled_and_user(self, capsys, tmp_user_dir):
|
||||
(tmp_user_dir / "my-workflow.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
|
||||
bundled_text = {"default": MINIMAL_YAML}
|
||||
with _mock_bundled(["default"]), _mock_bundled_text(bundled_text):
|
||||
rc = cmd_list()
|
||||
|
||||
out = capsys.readouterr().out
|
||||
assert rc == 0
|
||||
assert "Bundled" in out
|
||||
assert "default" in out
|
||||
assert "User" in out
|
||||
assert "my-workflow" in out
|
||||
|
||||
def test_no_workflows(self, capsys, tmp_user_dir):
|
||||
# tmp_user_dir is empty, and we mock bundled to return empty
|
||||
with _mock_bundled([]):
|
||||
rc = cmd_list()
|
||||
assert rc == 0
|
||||
assert "No workflows" in capsys.readouterr().out
|
||||
|
||||
def test_only_bundled(self, capsys, tmp_user_dir):
|
||||
with _mock_bundled(["default"]), _mock_bundled_text({"default": MINIMAL_YAML}):
|
||||
rc = cmd_list()
|
||||
out = capsys.readouterr().out
|
||||
assert rc == 0
|
||||
assert "Bundled" in out
|
||||
assert "User" not in out # no user workflows
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# cmd_show
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestCmdShow:
|
||||
def test_show_bundled(self, capsys):
|
||||
with patch.object(_wf_cmd, "_workflow_yaml_text", return_value=MINIMAL_YAML):
|
||||
rc = cmd_show("default")
|
||||
assert rc == 0
|
||||
assert "name: test-workflow" in capsys.readouterr().out
|
||||
|
||||
def test_show_not_found(self, capsys):
|
||||
with patch.object(_wf_cmd, "_workflow_yaml_text", return_value=None):
|
||||
rc = cmd_show("nonexistent")
|
||||
assert rc == 1
|
||||
assert "not found" in capsys.readouterr().err.lower()
|
||||
|
||||
def test_show_user_workflow(self, capsys, tmp_user_dir):
|
||||
(tmp_user_dir / "my-wf.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
rc = cmd_show("my-wf")
|
||||
assert rc == 0
|
||||
assert "name: test-workflow" in capsys.readouterr().out
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# cmd_copy
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestCmdCopy:
|
||||
def test_copy_bundled_to_user_dir(self, capsys, tmp_user_dir):
|
||||
with _mock_bundled_text({"security-focus": MINIMAL_YAML}):
|
||||
rc = cmd_copy(["security-focus"])
|
||||
|
||||
assert rc == 0
|
||||
dest = tmp_user_dir / "security-focus.yaml"
|
||||
assert dest.exists()
|
||||
assert dest.read_text(encoding="utf-8") == MINIMAL_YAML
|
||||
|
||||
def test_copy_nonexistent(self, capsys, tmp_user_dir):
|
||||
with _mock_bundled_text({}):
|
||||
with _mock_bundled([]):
|
||||
rc = cmd_copy(["ghost-workflow"])
|
||||
assert rc == 1
|
||||
assert "not found" in capsys.readouterr().err.lower()
|
||||
|
||||
def test_copy_overwrites_existing(self, capsys, tmp_user_dir):
|
||||
existing = tmp_user_dir / "default.yaml"
|
||||
existing.write_text("old content", encoding="utf-8")
|
||||
|
||||
with _mock_bundled_text({"default": MINIMAL_YAML}):
|
||||
rc = cmd_copy(["default"])
|
||||
|
||||
assert rc == 0
|
||||
assert existing.read_text(encoding="utf-8") == MINIMAL_YAML
|
||||
assert "Warning" in capsys.readouterr().out
|
||||
|
||||
def test_copy_multiple(self, capsys, tmp_user_dir):
|
||||
"""Copying multiple bundled workflows installs all of them."""
|
||||
texts = {"default": MINIMAL_YAML, "minimal": MINIMAL_YAML}
|
||||
with _mock_bundled_text(texts):
|
||||
rc = cmd_copy(["default", "minimal"])
|
||||
|
||||
assert rc == 0
|
||||
assert (tmp_user_dir / "default.yaml").exists()
|
||||
assert (tmp_user_dir / "minimal.yaml").exists()
|
||||
|
||||
def test_copy_partial_failure_continues(self, capsys, tmp_user_dir):
|
||||
"""A missing workflow doesn't prevent others from being copied."""
|
||||
with _mock_bundled_text({"default": MINIMAL_YAML}), _mock_bundled(["default"]):
|
||||
rc = cmd_copy(["default", "ghost"])
|
||||
|
||||
assert rc == 1
|
||||
assert (tmp_user_dir / "default.yaml").exists()
|
||||
assert "not found" in capsys.readouterr().err.lower()
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# cmd_add
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestCmdAdd:
|
||||
def test_add_valid_yaml(self, capsys, tmp_user_dir, sample_yaml_file):
|
||||
rc = cmd_add([str(sample_yaml_file)])
|
||||
assert rc == 0
|
||||
dest = tmp_user_dir / "test-workflow.yaml"
|
||||
assert dest.exists()
|
||||
assert "Installed" in capsys.readouterr().out
|
||||
|
||||
def test_add_with_override_name(self, capsys, tmp_user_dir, sample_yaml_file):
|
||||
rc = cmd_add([str(sample_yaml_file)], override_name="custom-name")
|
||||
assert rc == 0
|
||||
assert (tmp_user_dir / "custom-name.yaml").exists()
|
||||
|
||||
def test_add_invalid_yaml(self, capsys, tmp_path, tmp_user_dir):
|
||||
bad = tmp_path / "bad.yaml"
|
||||
bad.write_text(INVALID_YAML, encoding="utf-8")
|
||||
rc = cmd_add([str(bad)])
|
||||
assert rc == 1
|
||||
assert "invalid" in capsys.readouterr().err.lower()
|
||||
|
||||
def test_add_nonexistent_file(self, capsys, tmp_user_dir):
|
||||
rc = cmd_add(["/nonexistent/path/workflow.yaml"])
|
||||
assert rc == 1
|
||||
assert "does not exist" in capsys.readouterr().err.lower()
|
||||
|
||||
def test_add_wrong_extension(self, capsys, tmp_path, tmp_user_dir):
|
||||
f = tmp_path / "workflow.json"
|
||||
f.write_text("{}", encoding="utf-8")
|
||||
rc = cmd_add([str(f)])
|
||||
assert rc == 1
|
||||
|
||||
def test_add_overwrites_with_warning(self, capsys, tmp_user_dir, sample_yaml_file):
|
||||
# Pre-create the destination
|
||||
(tmp_user_dir / "test-workflow.yaml").write_text("old", encoding="utf-8")
|
||||
rc = cmd_add([str(sample_yaml_file)])
|
||||
assert rc == 0
|
||||
assert "Warning" in capsys.readouterr().out
|
||||
|
||||
def test_add_multiple_files(self, capsys, tmp_user_dir, tmp_path):
|
||||
"""Adding multiple YAML files installs all of them."""
|
||||
wf1 = tmp_path / "wf-one.yaml"
|
||||
wf2 = tmp_path / "wf-two.yaml"
|
||||
wf1.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
wf2.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
|
||||
rc = cmd_add([str(wf1), str(wf2)])
|
||||
assert rc == 0
|
||||
assert (tmp_user_dir / "wf-one.yaml").exists()
|
||||
assert (tmp_user_dir / "wf-two.yaml").exists()
|
||||
out = capsys.readouterr().out
|
||||
assert "wf-one" in out
|
||||
assert "wf-two" in out
|
||||
|
||||
def test_add_multiple_name_flag_rejected(self, capsys, tmp_user_dir, tmp_path):
|
||||
"""--name with multiple files returns error without installing."""
|
||||
wf1 = tmp_path / "wf-a.yaml"
|
||||
wf2 = tmp_path / "wf-b.yaml"
|
||||
wf1.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
wf2.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
|
||||
rc = cmd_add([str(wf1), str(wf2)], override_name="should-fail")
|
||||
assert rc == 1
|
||||
assert "cannot be used" in capsys.readouterr().err.lower()
|
||||
assert not (tmp_user_dir / "should-fail.yaml").exists()
|
||||
|
||||
def test_add_partial_failure_continues(self, capsys, tmp_user_dir, tmp_path):
|
||||
"""A bad file in the middle doesn't prevent valid files from installing."""
|
||||
good = tmp_path / "good.yaml"
|
||||
bad = tmp_path / "bad.yaml"
|
||||
good.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
bad.write_text(INVALID_YAML, encoding="utf-8")
|
||||
|
||||
rc = cmd_add([str(good), str(bad)])
|
||||
assert rc == 1 # non-zero because of the bad file
|
||||
assert (tmp_user_dir / "good.yaml").exists() # good one still installed
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# cmd_remove
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestCmdRemove:
|
||||
def test_remove_user_workflow(self, capsys, tmp_user_dir):
|
||||
wf = tmp_user_dir / "my-wf.yaml"
|
||||
wf.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
|
||||
with _mock_bundled([]):
|
||||
rc = cmd_remove(["my-wf"])
|
||||
|
||||
assert rc == 0
|
||||
assert not wf.exists()
|
||||
assert "Removed" in capsys.readouterr().out
|
||||
|
||||
def test_remove_bundled_refused(self, capsys, tmp_user_dir):
|
||||
with _mock_bundled(["default"]):
|
||||
rc = cmd_remove(["default"])
|
||||
assert rc == 1
|
||||
assert "bundled" in capsys.readouterr().err.lower()
|
||||
|
||||
def test_remove_nonexistent(self, capsys, tmp_user_dir):
|
||||
with _mock_bundled([]):
|
||||
rc = cmd_remove(["ghost"])
|
||||
assert rc == 1
|
||||
assert "not found" in capsys.readouterr().err.lower()
|
||||
|
||||
def test_remove_yml_extension(self, capsys, tmp_user_dir):
|
||||
wf = tmp_user_dir / "my-wf.yml"
|
||||
wf.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
|
||||
with _mock_bundled([]):
|
||||
rc = cmd_remove(["my-wf"])
|
||||
|
||||
assert rc == 0
|
||||
assert not wf.exists()
|
||||
|
||||
def test_remove_multiple(self, capsys, tmp_user_dir):
|
||||
"""Removing multiple workflows deletes all of them."""
|
||||
(tmp_user_dir / "wf-a.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
(tmp_user_dir / "wf-b.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
|
||||
with _mock_bundled([]):
|
||||
rc = cmd_remove(["wf-a", "wf-b"])
|
||||
|
||||
assert rc == 0
|
||||
assert not (tmp_user_dir / "wf-a.yaml").exists()
|
||||
assert not (tmp_user_dir / "wf-b.yaml").exists()
|
||||
|
||||
def test_remove_partial_failure_continues(self, capsys, tmp_user_dir):
|
||||
"""A missing workflow doesn't prevent others from being removed."""
|
||||
(tmp_user_dir / "wf-good.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
|
||||
with _mock_bundled([]):
|
||||
rc = cmd_remove(["wf-good", "ghost"])
|
||||
|
||||
assert rc == 1
|
||||
assert not (tmp_user_dir / "wf-good.yaml").exists()
|
||||
assert "not found" in capsys.readouterr().err.lower()
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# cmd_validate
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestCmdValidate:
|
||||
def test_validate_bundled_by_name(self, capsys):
|
||||
with patch.object(_wf_cmd, "WorkflowEngine") as mock_engine_cls:
|
||||
mock_wf = MagicMock()
|
||||
mock_wf.name = "security-focus"
|
||||
mock_wf.description = "Security review"
|
||||
mock_wf.version = "1.0"
|
||||
mock_wf.stages = [MagicMock(name="step1", type="custom", enabled=True)]
|
||||
mock_engine_cls.return_value.workflow = mock_wf
|
||||
|
||||
rc = cmd_validate("security-focus")
|
||||
|
||||
assert rc == 0
|
||||
out = capsys.readouterr().out
|
||||
assert "valid" in out.lower()
|
||||
assert "security-focus" in out
|
||||
|
||||
def test_validate_file_path(self, capsys, sample_yaml_file):
|
||||
rc = cmd_validate(str(sample_yaml_file))
|
||||
assert rc == 0
|
||||
assert "valid" in capsys.readouterr().out.lower()
|
||||
|
||||
def test_validate_not_found(self, capsys):
|
||||
with patch.object(_wf_cmd, "WorkflowEngine", side_effect=FileNotFoundError("not found")):
|
||||
rc = cmd_validate("nonexistent")
|
||||
assert rc == 1
|
||||
assert "error" in capsys.readouterr().err.lower()
|
||||
|
||||
def test_validate_invalid_content(self, capsys, tmp_path):
|
||||
bad = tmp_path / "bad.yaml"
|
||||
bad.write_text("- this: is\n- not: valid workflow", encoding="utf-8")
|
||||
rc = cmd_validate(str(bad))
|
||||
assert rc == 1
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# main() entry point
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestMain:
|
||||
def test_main_no_action_exits_0(self):
|
||||
from skill_seekers.cli.workflows_command import main
|
||||
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
main([])
|
||||
assert exc.value.code == 0
|
||||
|
||||
def test_main_list(self, capsys, tmp_user_dir):
|
||||
from skill_seekers.cli.workflows_command import main
|
||||
|
||||
# tmp_user_dir is empty; mock bundled to return nothing
|
||||
with _mock_bundled([]):
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
main(["list"])
|
||||
assert exc.value.code == 0
|
||||
|
||||
def test_main_validate_success(self, capsys, sample_yaml_file):
|
||||
from skill_seekers.cli.workflows_command import main
|
||||
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
main(["validate", str(sample_yaml_file)])
|
||||
assert exc.value.code == 0
|
||||
|
||||
def test_main_show_success(self, capsys, tmp_user_dir):
|
||||
(tmp_user_dir / "my-wf.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["show", "my-wf"])
|
||||
assert exc.value.code == 0
|
||||
assert "name: test-workflow" in capsys.readouterr().out
|
||||
|
||||
def test_main_show_not_found_exits_1(self, capsys, tmp_user_dir):
|
||||
with patch.object(_wf_cmd, "_workflow_yaml_text", return_value=None):
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["show", "ghost"])
|
||||
assert exc.value.code == 1
|
||||
|
||||
def test_main_copy_single(self, capsys, tmp_user_dir):
|
||||
with _mock_bundled_text({"default": MINIMAL_YAML}):
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["copy", "default"])
|
||||
assert exc.value.code == 0
|
||||
assert (tmp_user_dir / "default.yaml").exists()
|
||||
|
||||
def test_main_copy_multiple(self, capsys, tmp_user_dir):
|
||||
texts = {"default": MINIMAL_YAML, "minimal": MINIMAL_YAML}
|
||||
with _mock_bundled_text(texts):
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["copy", "default", "minimal"])
|
||||
assert exc.value.code == 0
|
||||
assert (tmp_user_dir / "default.yaml").exists()
|
||||
assert (tmp_user_dir / "minimal.yaml").exists()
|
||||
|
||||
def test_main_copy_not_found_exits_1(self, capsys, tmp_user_dir):
|
||||
with _mock_bundled_text({}), _mock_bundled([]):
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["copy", "ghost"])
|
||||
assert exc.value.code == 1
|
||||
|
||||
def test_main_add_single_file(self, capsys, tmp_user_dir, sample_yaml_file):
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["add", str(sample_yaml_file)])
|
||||
assert exc.value.code == 0
|
||||
assert (tmp_user_dir / "test-workflow.yaml").exists()
|
||||
|
||||
def test_main_add_multiple_files(self, capsys, tmp_user_dir, tmp_path):
|
||||
wf1 = tmp_path / "wf-a.yaml"
|
||||
wf2 = tmp_path / "wf-b.yaml"
|
||||
wf1.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
wf2.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["add", str(wf1), str(wf2)])
|
||||
assert exc.value.code == 0
|
||||
assert (tmp_user_dir / "wf-a.yaml").exists()
|
||||
assert (tmp_user_dir / "wf-b.yaml").exists()
|
||||
|
||||
def test_main_add_with_name_flag(self, capsys, tmp_user_dir, sample_yaml_file):
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["add", str(sample_yaml_file), "--name", "renamed"])
|
||||
assert exc.value.code == 0
|
||||
assert (tmp_user_dir / "renamed.yaml").exists()
|
||||
|
||||
def test_main_add_name_rejected_for_multiple(self, capsys, tmp_user_dir, tmp_path):
|
||||
wf1 = tmp_path / "wf-a.yaml"
|
||||
wf2 = tmp_path / "wf-b.yaml"
|
||||
wf1.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
wf2.write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["add", str(wf1), str(wf2), "--name", "bad"])
|
||||
assert exc.value.code == 1
|
||||
|
||||
def test_main_remove_single(self, capsys, tmp_user_dir):
|
||||
(tmp_user_dir / "my-wf.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
with _mock_bundled([]):
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["remove", "my-wf"])
|
||||
assert exc.value.code == 0
|
||||
assert not (tmp_user_dir / "my-wf.yaml").exists()
|
||||
|
||||
def test_main_remove_multiple(self, capsys, tmp_user_dir):
|
||||
(tmp_user_dir / "wf-a.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
(tmp_user_dir / "wf-b.yaml").write_text(MINIMAL_YAML, encoding="utf-8")
|
||||
with _mock_bundled([]):
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["remove", "wf-a", "wf-b"])
|
||||
assert exc.value.code == 0
|
||||
assert not (tmp_user_dir / "wf-a.yaml").exists()
|
||||
assert not (tmp_user_dir / "wf-b.yaml").exists()
|
||||
|
||||
def test_main_remove_bundled_refused(self, capsys, tmp_user_dir):
|
||||
with _mock_bundled(["default"]):
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["remove", "default"])
|
||||
assert exc.value.code == 1
|
||||
|
||||
def test_main_remove_not_found_exits_1(self, capsys, tmp_user_dir):
|
||||
with _mock_bundled([]):
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
_wf_cmd.main(["remove", "ghost"])
|
||||
assert exc.value.code == 1
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Parser argument binding
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestWorkflowsParserArgumentBinding:
|
||||
"""Verify nargs='+' parsers produce lists with correct attribute names."""
|
||||
|
||||
def _parse(self, argv):
|
||||
"""Parse argv through the standalone main() parser by capturing args."""
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="action")
|
||||
|
||||
copy_p = subparsers.add_parser("copy")
|
||||
copy_p.add_argument("workflow_names", nargs="+")
|
||||
|
||||
add_p = subparsers.add_parser("add")
|
||||
add_p.add_argument("files", nargs="+")
|
||||
add_p.add_argument("--name")
|
||||
|
||||
remove_p = subparsers.add_parser("remove")
|
||||
remove_p.add_argument("workflow_names", nargs="+")
|
||||
|
||||
return parser.parse_args(argv)
|
||||
|
||||
def test_copy_single_produces_list(self):
|
||||
args = self._parse(["copy", "security-focus"])
|
||||
assert args.workflow_names == ["security-focus"]
|
||||
|
||||
def test_copy_multiple_produces_list(self):
|
||||
args = self._parse(["copy", "security-focus", "minimal"])
|
||||
assert args.workflow_names == ["security-focus", "minimal"]
|
||||
|
||||
def test_add_single_produces_list(self):
|
||||
args = self._parse(["add", "my.yaml"])
|
||||
assert args.files == ["my.yaml"]
|
||||
|
||||
def test_add_multiple_produces_list(self):
|
||||
args = self._parse(["add", "a.yaml", "b.yaml", "c.yaml"])
|
||||
assert args.files == ["a.yaml", "b.yaml", "c.yaml"]
|
||||
|
||||
def test_add_name_flag_captured(self):
|
||||
args = self._parse(["add", "my.yaml", "--name", "custom"])
|
||||
assert args.files == ["my.yaml"]
|
||||
assert args.name == "custom"
|
||||
|
||||
def test_remove_single_produces_list(self):
|
||||
args = self._parse(["remove", "my-wf"])
|
||||
assert args.workflow_names == ["my-wf"]
|
||||
|
||||
def test_remove_multiple_produces_list(self):
|
||||
args = self._parse(["remove", "wf-a", "wf-b"])
|
||||
assert args.workflow_names == ["wf-a", "wf-b"]
|
||||
Reference in New Issue
Block a user