From bafb1553347e66881e940f01ec704851afab9ce1 Mon Sep 17 00:00:00 2001 From: Leo Date: Wed, 11 Mar 2026 20:23:01 +0100 Subject: [PATCH] fix(engineering): improve agent-workflow-designer - add scripts + extract references --- engineering/agent-workflow-designer/SKILL.md | 442 ++---------------- .../references/workflow-patterns.md | 82 ++++ .../scripts/workflow_scaffolder.py | 113 +++++ 3 files changed, 236 insertions(+), 401 deletions(-) create mode 100644 engineering/agent-workflow-designer/references/workflow-patterns.md create mode 100755 engineering/agent-workflow-designer/scripts/workflow_scaffolder.py diff --git a/engineering/agent-workflow-designer/SKILL.md b/engineering/agent-workflow-designer/SKILL.md index f1380c5..aa4b1c8 100644 --- a/engineering/agent-workflow-designer/SKILL.md +++ b/engineering/agent-workflow-designer/SKILL.md @@ -13,431 +13,71 @@ description: "Agent Workflow Designer" ## Overview -Design production-grade multi-agent orchestration systems. Covers five core patterns (sequential pipeline, parallel fan-out/fan-in, hierarchical delegation, event-driven, consensus), platform-specific implementations, handoff protocols, state management, error recovery, context window budgeting, and cost optimization. - ---- +Design production-grade multi-agent workflows with clear pattern choice, handoff contracts, failure handling, and cost/context controls. ## Core Capabilities -- Pattern selection guide for any orchestration requirement -- Handoff protocol templates (structured context passing) -- State management patterns for multi-agent workflows -- Error recovery and retry strategies -- Context window budget management -- Cost optimization strategies per platform -- Platform-specific configs: Claude Code Agent Teams, OpenClaw, CrewAI, AutoGen +- Workflow pattern selection for multi-step agent systems +- Skeleton config generation for fast workflow bootstrapping +- Context and cost discipline across long-running flows +- Error recovery and retry strategy scaffolding +- Documentation pointers for operational pattern tradeoffs --- ## When to Use -- Building a multi-step AI pipeline that exceeds one agent's context capacity -- Parallelizing research, generation, or analysis tasks for speed -- Creating specialist agents with defined roles and handoff contracts -- Designing fault-tolerant AI workflows for production +- A single prompt is insufficient for task complexity +- You need specialist agents with explicit boundaries +- You want deterministic workflow structure before implementation +- You need validation loops for quality or safety gates --- -## Pattern Selection Guide +## Quick Start -``` -Is the task sequential (each step needs previous output)? - YES → Sequential Pipeline - NO → Can tasks run in parallel? - YES → Parallel Fan-out/Fan-in - NO → Is there a hierarchy of decisions? - YES → Hierarchical Delegation - NO → Is it event-triggered? - YES → Event-Driven - NO → Need consensus/validation? - YES → Consensus Pattern +```bash +# Generate a sequential workflow skeleton +python3 scripts/workflow_scaffolder.py sequential --name content-pipeline + +# Generate an orchestrator workflow and save it +python3 scripts/workflow_scaffolder.py orchestrator --name incident-triage --output workflows/incident-triage.json ``` --- -## Pattern 1: Sequential Pipeline +## Pattern Map -**Use when:** Each step depends on the previous output. Research → Draft → Review → Polish. +- `sequential`: strict step-by-step dependency chain +- `parallel`: fan-out/fan-in for independent subtasks +- `router`: dispatch by intent/type with fallback +- `orchestrator`: planner coordinates specialists with dependencies +- `evaluator`: generator + quality gate loop -```python -# sequential_pipeline.py -from dataclasses import dataclass -from typing import Callable, Any -import anthropic - -@dataclass -class PipelineStage: - name: "str" - system_prompt: str - input_key: str # what to take from state - output_key: str # what to write to state - model: str = "claude-3-5-sonnet-20241022" - max_tokens: int = 2048 - -class SequentialPipeline: - def __init__(self, stages: list[PipelineStage]): - self.stages = stages - self.client = anthropic.Anthropic() - - def run(self, initial_input: str) -> dict: - state = {"input": initial_input} - - for stage in self.stages: - print(f"[{stage.name}] Processing...") - - stage_input = state.get(stage.input_key, "") - - response = self.client.messages.create( - model=stage.model, - max_tokens=stage.max_tokens, - system=stage.system_prompt, - messages=[{"role": "user", "content": stage_input}], - ) - - state[stage.output_key] = response.content[0].text - state[f"{stage.name}_tokens"] = response.usage.input_tokens + response.usage.output_tokens - - print(f"[{stage.name}] Done. Tokens: {state[f'{stage.name}_tokens']}") - - return state - -# Example: Blog post pipeline -pipeline = SequentialPipeline([ - PipelineStage( - name="researcher", - system_prompt="You are a research specialist. Given a topic, produce a structured research brief with: key facts, statistics, expert perspectives, and controversy points.", - input_key="input", - output_key="research", - ), - PipelineStage( - name="writer", - system_prompt="You are a senior content writer. Using the research provided, write a compelling 800-word blog post with a clear hook, 3 main sections, and a strong CTA.", - input_key="research", - output_key="draft", - ), - PipelineStage( - name="editor", - system_prompt="You are a copy editor. Review the draft for: clarity, flow, grammar, and SEO. Return the improved version only, no commentary.", - input_key="draft", - output_key="final", - ), -]) -``` +Detailed templates: `references/workflow-patterns.md` --- -## Pattern 2: Parallel Fan-out / Fan-in +## Recommended Workflow -**Use when:** Independent tasks that can run concurrently. Research 5 competitors simultaneously. - -```python -# parallel_fanout.py -import asyncio -import anthropic -from typing import Any - -async def run_agent(client, task_name: "str-system-str-user-str-model-str"claude-3-5-sonnet-20241022") -> dict: - """Single async agent call""" - loop = asyncio.get_event_loop() - - def _call(): - return client.messages.create( - model=model, - max_tokens=2048, - system=system, - messages=[{"role": "user", "content": user}], - ) - - response = await loop.run_in_executor(None, _call) - return { - "task": task_name, - "output": response.content[0].text, - "tokens": response.usage.input_tokens + response.usage.output_tokens, - } - -async def parallel_research(competitors: list[str], research_type: str) -> dict: - """Fan-out: research all competitors in parallel. Fan-in: synthesize results.""" - client = anthropic.Anthropic() - - # FAN-OUT: spawn parallel agent calls - tasks = [ - run_agent( - client, - task_name=competitor, - system=f"You are a competitive intelligence analyst. Research {competitor} and provide: pricing, key features, target market, and known weaknesses.", - user=f"Analyze {competitor} for comparison with our product in the {research_type} market.", - ) - for competitor in competitors - ] - - results = await asyncio.gather(*tasks, return_exceptions=True) - - # Handle failures gracefully - successful = [r for r in results if not isinstance(r, Exception)] - failed = [r for r in results if isinstance(r, Exception)] - - if failed: - print(f"Warning: {len(failed)} research tasks failed: {failed}") - - # FAN-IN: synthesize - combined_research = "\n\n".join([ - f"## {r['task']}\n{r['output']}" for r in successful - ]) - - synthesis = await run_agent( - client, - task_name="synthesizer", - system="You are a strategic analyst. Synthesize competitor research into a concise comparison matrix and strategic recommendations.", - user=f"Synthesize these competitor analyses:\n\n{combined_research}", - model="claude-3-5-sonnet-20241022", - ) - - return { - "individual_analyses": successful, - "synthesis": synthesis["output"], - "total_tokens": sum(r["tokens"] for r in successful) + synthesis["tokens"], - } -``` - ---- - -## Pattern 3: Hierarchical Delegation - -**Use when:** Complex tasks with subtask discovery. Orchestrator breaks down work, delegates to specialists. - -```python -# hierarchical_delegation.py -import json -import anthropic - -ORCHESTRATOR_SYSTEM = """You are an orchestration agent. Your job is to: -1. Analyze the user's request -2. Break it into subtasks -3. Assign each to the appropriate specialist agent -4. Collect results and synthesize - -Available specialists: -- researcher: finds facts, data, and information -- writer: creates content and documents -- coder: writes and reviews code -- analyst: analyzes data and produces insights - -Respond with a JSON plan: -{ - "subtasks": [ - {"id": "1", "agent": "researcher", "task": "...", "depends_on": []}, - {"id": "2", "agent": "writer", "task": "...", "depends_on": ["1"]} - ] -}""" - -SPECIALIST_SYSTEMS = { - "researcher": "You are a research specialist. Find accurate, relevant information and cite sources when possible.", - "writer": "You are a professional writer. Create clear, engaging content in the requested format.", - "coder": "You are a senior software engineer. Write clean, well-commented code with error handling.", - "analyst": "You are a data analyst. Provide structured analysis with evidence-backed conclusions.", -} - -class HierarchicalOrchestrator: - def __init__(self): - self.client = anthropic.Anthropic() - - def run(self, user_request: str) -> str: - # 1. Orchestrator creates plan - plan_response = self.client.messages.create( - model="claude-3-5-sonnet-20241022", - max_tokens=1024, - system=ORCHESTRATOR_SYSTEM, - messages=[{"role": "user", "content": user_request}], - ) - - plan = json.loads(plan_response.content[0].text) - results = {} - - # 2. Execute subtasks respecting dependencies - for subtask in self._topological_sort(plan["subtasks"]): - context = self._build_context(subtask, results) - specialist = SPECIALIST_SYSTEMS[subtask["agent"]] - - result = self.client.messages.create( - model="claude-3-5-sonnet-20241022", - max_tokens=2048, - system=specialist, - messages=[{"role": "user", "content": f"{context}\n\nTask: {subtask['task']}"}], - ) - results[subtask["id"]] = result.content[0].text - - # 3. Final synthesis - all_results = "\n\n".join([f"### {k}\n{v}" for k, v in results.items()]) - synthesis = self.client.messages.create( - model="claude-3-5-sonnet-20241022", - max_tokens=2048, - system="Synthesize the specialist outputs into a coherent final response.", - messages=[{"role": "user", "content": f"Original request: {user_request}\n\nSpecialist outputs:\n{all_results}"}], - ) - return synthesis.content[0].text - - def _build_context(self, subtask: dict, results: dict) -> str: - if not subtask.get("depends_on"): - return "" - deps = [f"Output from task {dep}:\n{results[dep]}" for dep in subtask["depends_on"] if dep in results] - return "Previous results:\n" + "\n\n".join(deps) if deps else "" - - def _topological_sort(self, subtasks: list) -> list: - # Simple ordered execution respecting depends_on - ordered, remaining = [], list(subtasks) - completed = set() - while remaining: - for task in remaining: - if all(dep in completed for dep in task.get("depends_on", [])): - ordered.append(task) - completed.add(task["id"]) - remaining.remove(task) - break - return ordered -``` - ---- - -## Handoff Protocol Template - -```python -# Standard handoff context format — use between all agents -@dataclass -class AgentHandoff: - """Structured context passed between agents in a workflow.""" - task_id: str - workflow_id: str - step_number: int - total_steps: int - - # What was done - previous_agent: str - previous_output: str - artifacts: dict # {"filename": "content"} for any files produced - - # What to do next - current_agent: str - current_task: str - constraints: list[str] # hard rules for this step - - # Metadata - context_budget_remaining: int # tokens left for this agent - cost_so_far_usd: float - - def to_prompt(self) -> str: - return f""" -# Agent Handoff — Step {self.step_number}/{self.total_steps} - -## Your Task -{self.current_task} - -## Constraints -{chr(10).join(f'- {c}' for c in self.constraints)} - -## Context from Previous Step ({self.previous_agent}) -{self.previous_output[:2000]}{"... [truncated]" if len(self.previous_output) > 2000 else ""} - -## Context Budget -You have approximately {self.context_budget_remaining} tokens remaining. Be concise. -""" -``` - ---- - -## Error Recovery Patterns - -```python -import time -from functools import wraps - -def with_retry(max_attempts=3, backoff_seconds=2, fallback_model=None): - """Decorator for agent calls with exponential backoff and model fallback.""" - def decorator(fn): - @wraps(fn) - def wrapper(*args, **kwargs): - last_error = None - for attempt in range(max_attempts): - try: - return fn(*args, **kwargs) - except Exception as e: - last_error = e - if attempt < max_attempts - 1: - wait = backoff_seconds * (2 ** attempt) - print(f"Attempt {attempt+1} failed: {e}. Retrying in {wait}s...") - time.sleep(wait) - - # Fall back to cheaper/faster model on rate limit - if fallback_model and "rate_limit" in str(e).lower(): - kwargs["model"] = fallback_model - raise last_error - return wrapper - return decorator - -@with_retry(max_attempts=3, fallback_model="claude-3-haiku-20240307") -def call_agent(model, system, user): - ... -``` - ---- - -## Context Window Budgeting - -```python -# Budget context across a multi-step pipeline -# Rule: never let any step consume more than 60% of remaining budget - -CONTEXT_LIMITS = { - "claude-3-5-sonnet-20241022": 200_000, - "gpt-4o": 128_000, -} - -class ContextBudget: - def __init__(self, model: str, reserve_pct: float = 0.2): - total = CONTEXT_LIMITS.get(model, 128_000) - self.total = total - self.reserve = int(total * reserve_pct) # keep 20% as buffer - self.used = 0 - - @property - def remaining(self): - return self.total - self.reserve - self.used - - def allocate(self, step_name: "str-requested-int-int" - allocated = min(requested, int(self.remaining * 0.6)) # max 60% of remaining - print(f"[Budget] {step_name}: allocated {allocated:,} tokens (remaining: {self.remaining:,})") - return allocated - - def consume(self, tokens_used: int): - self.used += tokens_used - -def truncate_to_budget(text: str, token_budget: int, chars_per_token: float = 4.0) -> str: - """Rough truncation — use tiktoken for precision.""" - char_budget = int(token_budget * chars_per_token) - if len(text) <= char_budget: - return text - return text[:char_budget] + "\n\n[... truncated to fit context budget ...]" -``` - ---- - -## Cost Optimization Strategies - -| Strategy | Savings | Tradeoff | -|---|---|---| -| Use Haiku for routing/classification | 85-90% | Slightly less nuanced judgment | -| Cache repeated system prompts | 50-90% | Requires prompt caching setup | -| Truncate intermediate outputs | 20-40% | May lose detail in handoffs | -| Batch similar tasks | 50% | Latency increases | -| Use Sonnet for most, Opus for final step only | 60-70% | Final quality may improve | -| Short-circuit on confidence threshold | 30-50% | Need confidence scoring | +1. Select pattern based on dependency shape and risk profile. +2. Scaffold config via `scripts/workflow_scaffolder.py`. +3. Define handoff contract fields for every edge. +4. Add retry/timeouts and output validation gates. +5. Dry-run with small context budgets before scaling. --- ## Common Pitfalls -- **Circular dependencies** — agents calling each other in loops; enforce DAG structure at design time -- **Context bleed** — passing entire previous output to every step; summarize or extract only what's needed -- **No timeout** — a stuck agent blocks the whole pipeline; always set max_tokens and wall-clock timeouts -- **Silent failures** — agent returns plausible but wrong output; add validation steps for critical paths -- **Ignoring cost** — 10 parallel Opus calls is $0.50 per workflow; model selection is a cost decision -- **Over-orchestration** — if a single prompt can do it, it should; only add agents when genuinely needed +- Over-orchestrating tasks solvable by one well-structured prompt +- Missing timeout/retry policies for external-model calls +- Passing full upstream context instead of targeted artifacts +- Ignoring per-step cost accumulation + +## Best Practices + +1. Start with the smallest pattern that can satisfy requirements. +2. Keep handoff payloads explicit and bounded. +3. Validate intermediate outputs before fan-in synthesis. +4. Enforce budget and timeout limits in every step. diff --git a/engineering/agent-workflow-designer/references/workflow-patterns.md b/engineering/agent-workflow-designer/references/workflow-patterns.md new file mode 100644 index 0000000..68b663c --- /dev/null +++ b/engineering/agent-workflow-designer/references/workflow-patterns.md @@ -0,0 +1,82 @@ +# Workflow Pattern Templates + +## Sequential + +Use when each step depends on prior output. + +```json +{ + "pattern": "sequential", + "steps": ["research", "draft", "review"] +} +``` + +## Parallel + +Use when independent tasks can fan out and then fan in. + +```json +{ + "pattern": "parallel", + "fan_out": ["task_a", "task_b", "task_c"], + "fan_in": "synthesizer" +} +``` + +## Router + +Use when tasks must be routed to specialized handlers by intent. + +```json +{ + "pattern": "router", + "router": "intent_router", + "routes": ["sales", "support", "engineering"], + "fallback": "generalist" +} +``` + +## Orchestrator + +Use when dynamic planning and dependency management are required. + +```json +{ + "pattern": "orchestrator", + "orchestrator": "planner", + "specialists": ["researcher", "analyst", "coder"], + "dependency_mode": "dag" +} +``` + +## Evaluator + +Use when output quality gates are mandatory before finalization. + +```json +{ + "pattern": "evaluator", + "generator": "content_agent", + "evaluator": "quality_agent", + "max_iterations": 3, + "pass_threshold": 0.8 +} +``` + +## Pattern Selection Heuristics + +- Choose `sequential` for strict linear workflows. +- Choose `parallel` for throughput and latency reduction. +- Choose `router` for intent- or type-based branching. +- Choose `orchestrator` for complex adaptive workflows. +- Choose `evaluator` when correctness/quality loops are required. + +## Handoff Minimum Contract + +- `workflow_id` +- `step_id` +- `task` +- `constraints` +- `upstream_artifacts` +- `budget_tokens` +- `timeout_seconds` diff --git a/engineering/agent-workflow-designer/scripts/workflow_scaffolder.py b/engineering/agent-workflow-designer/scripts/workflow_scaffolder.py new file mode 100755 index 0000000..d8c37f6 --- /dev/null +++ b/engineering/agent-workflow-designer/scripts/workflow_scaffolder.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +"""Generate workflow skeleton configs from common multi-agent patterns.""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Dict, List + + +def sequential_template(name: str) -> Dict: + return { + "name": name, + "pattern": "sequential", + "steps": [ + {"id": "research", "agent": "researcher", "next": "draft"}, + {"id": "draft", "agent": "writer", "next": "review"}, + {"id": "review", "agent": "reviewer", "next": None}, + ], + "retry": {"max_attempts": 2, "backoff_seconds": 2}, + } + + +def parallel_template(name: str) -> Dict: + return { + "name": name, + "pattern": "parallel", + "fan_out": { + "tasks": ["research_a", "research_b", "research_c"], + "agent": "analyst", + }, + "fan_in": {"agent": "synthesizer", "output": "combined_report"}, + "timeouts": {"per_task_seconds": 180, "fan_in_seconds": 120}, + } + + +def router_template(name: str) -> Dict: + return { + "name": name, + "pattern": "router", + "router": {"agent": "router", "routes": ["sales", "support", "engineering"]}, + "handlers": { + "sales": {"agent": "sales_specialist"}, + "support": {"agent": "support_specialist"}, + "engineering": {"agent": "engineering_specialist"}, + }, + "fallback": {"agent": "generalist"}, + } + + +def orchestrator_template(name: str) -> Dict: + return { + "name": name, + "pattern": "orchestrator", + "orchestrator": {"agent": "orchestrator", "planning": "dynamic"}, + "specialists": ["researcher", "coder", "analyst", "writer"], + "execution": { + "dependency_mode": "dag", + "max_parallel": 3, + "completion_policy": "all_required", + }, + } + + +def evaluator_template(name: str) -> Dict: + return { + "name": name, + "pattern": "evaluator", + "generator": {"agent": "generator"}, + "evaluator": {"agent": "evaluator", "criteria": ["accuracy", "format", "safety"]}, + "loop": { + "max_iterations": 3, + "pass_threshold": 0.8, + "on_fail": "revise_and_retry", + }, + } + + +PATTERNS = { + "sequential": sequential_template, + "parallel": parallel_template, + "router": router_template, + "orchestrator": orchestrator_template, + "evaluator": evaluator_template, +} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Generate a workflow skeleton config from a pattern.") + parser.add_argument("pattern", choices=sorted(PATTERNS.keys()), help="Workflow pattern") + parser.add_argument("--name", default="new-workflow", help="Workflow name") + parser.add_argument("--output", help="Optional output path for JSON config") + return parser.parse_args() + + +def main() -> int: + args = parse_args() + config = PATTERNS[args.pattern](args.name) + payload = json.dumps(config, indent=2) + + if args.output: + out = Path(args.output) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(payload + "\n", encoding="utf-8") + print(f"Wrote workflow config to {out}") + else: + print(payload) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())