fix(engineering): improve agent-workflow-designer - add scripts + extract references
This commit is contained in:
@@ -13,431 +13,71 @@ description: "Agent Workflow Designer"
|
||||
|
||||
## Overview
|
||||
|
||||
Design production-grade multi-agent orchestration systems. Covers five core patterns (sequential pipeline, parallel fan-out/fan-in, hierarchical delegation, event-driven, consensus), platform-specific implementations, handoff protocols, state management, error recovery, context window budgeting, and cost optimization.
|
||||
|
||||
---
|
||||
Design production-grade multi-agent workflows with clear pattern choice, handoff contracts, failure handling, and cost/context controls.
|
||||
|
||||
## Core Capabilities
|
||||
|
||||
- Pattern selection guide for any orchestration requirement
|
||||
- Handoff protocol templates (structured context passing)
|
||||
- State management patterns for multi-agent workflows
|
||||
- Error recovery and retry strategies
|
||||
- Context window budget management
|
||||
- Cost optimization strategies per platform
|
||||
- Platform-specific configs: Claude Code Agent Teams, OpenClaw, CrewAI, AutoGen
|
||||
- Workflow pattern selection for multi-step agent systems
|
||||
- Skeleton config generation for fast workflow bootstrapping
|
||||
- Context and cost discipline across long-running flows
|
||||
- Error recovery and retry strategy scaffolding
|
||||
- Documentation pointers for operational pattern tradeoffs
|
||||
|
||||
---
|
||||
|
||||
## When to Use
|
||||
|
||||
- Building a multi-step AI pipeline that exceeds one agent's context capacity
|
||||
- Parallelizing research, generation, or analysis tasks for speed
|
||||
- Creating specialist agents with defined roles and handoff contracts
|
||||
- Designing fault-tolerant AI workflows for production
|
||||
- A single prompt is insufficient for task complexity
|
||||
- You need specialist agents with explicit boundaries
|
||||
- You want deterministic workflow structure before implementation
|
||||
- You need validation loops for quality or safety gates
|
||||
|
||||
---
|
||||
|
||||
## Pattern Selection Guide
|
||||
## Quick Start
|
||||
|
||||
```
|
||||
Is the task sequential (each step needs previous output)?
|
||||
YES → Sequential Pipeline
|
||||
NO → Can tasks run in parallel?
|
||||
YES → Parallel Fan-out/Fan-in
|
||||
NO → Is there a hierarchy of decisions?
|
||||
YES → Hierarchical Delegation
|
||||
NO → Is it event-triggered?
|
||||
YES → Event-Driven
|
||||
NO → Need consensus/validation?
|
||||
YES → Consensus Pattern
|
||||
```bash
|
||||
# Generate a sequential workflow skeleton
|
||||
python3 scripts/workflow_scaffolder.py sequential --name content-pipeline
|
||||
|
||||
# Generate an orchestrator workflow and save it
|
||||
python3 scripts/workflow_scaffolder.py orchestrator --name incident-triage --output workflows/incident-triage.json
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Pattern 1: Sequential Pipeline
|
||||
## Pattern Map
|
||||
|
||||
**Use when:** Each step depends on the previous output. Research → Draft → Review → Polish.
|
||||
- `sequential`: strict step-by-step dependency chain
|
||||
- `parallel`: fan-out/fan-in for independent subtasks
|
||||
- `router`: dispatch by intent/type with fallback
|
||||
- `orchestrator`: planner coordinates specialists with dependencies
|
||||
- `evaluator`: generator + quality gate loop
|
||||
|
||||
```python
|
||||
# sequential_pipeline.py
|
||||
from dataclasses import dataclass
|
||||
from typing import Callable, Any
|
||||
import anthropic
|
||||
|
||||
@dataclass
|
||||
class PipelineStage:
|
||||
name: "str"
|
||||
system_prompt: str
|
||||
input_key: str # what to take from state
|
||||
output_key: str # what to write to state
|
||||
model: str = "claude-3-5-sonnet-20241022"
|
||||
max_tokens: int = 2048
|
||||
|
||||
class SequentialPipeline:
|
||||
def __init__(self, stages: list[PipelineStage]):
|
||||
self.stages = stages
|
||||
self.client = anthropic.Anthropic()
|
||||
|
||||
def run(self, initial_input: str) -> dict:
|
||||
state = {"input": initial_input}
|
||||
|
||||
for stage in self.stages:
|
||||
print(f"[{stage.name}] Processing...")
|
||||
|
||||
stage_input = state.get(stage.input_key, "")
|
||||
|
||||
response = self.client.messages.create(
|
||||
model=stage.model,
|
||||
max_tokens=stage.max_tokens,
|
||||
system=stage.system_prompt,
|
||||
messages=[{"role": "user", "content": stage_input}],
|
||||
)
|
||||
|
||||
state[stage.output_key] = response.content[0].text
|
||||
state[f"{stage.name}_tokens"] = response.usage.input_tokens + response.usage.output_tokens
|
||||
|
||||
print(f"[{stage.name}] Done. Tokens: {state[f'{stage.name}_tokens']}")
|
||||
|
||||
return state
|
||||
|
||||
# Example: Blog post pipeline
|
||||
pipeline = SequentialPipeline([
|
||||
PipelineStage(
|
||||
name="researcher",
|
||||
system_prompt="You are a research specialist. Given a topic, produce a structured research brief with: key facts, statistics, expert perspectives, and controversy points.",
|
||||
input_key="input",
|
||||
output_key="research",
|
||||
),
|
||||
PipelineStage(
|
||||
name="writer",
|
||||
system_prompt="You are a senior content writer. Using the research provided, write a compelling 800-word blog post with a clear hook, 3 main sections, and a strong CTA.",
|
||||
input_key="research",
|
||||
output_key="draft",
|
||||
),
|
||||
PipelineStage(
|
||||
name="editor",
|
||||
system_prompt="You are a copy editor. Review the draft for: clarity, flow, grammar, and SEO. Return the improved version only, no commentary.",
|
||||
input_key="draft",
|
||||
output_key="final",
|
||||
),
|
||||
])
|
||||
```
|
||||
Detailed templates: `references/workflow-patterns.md`
|
||||
|
||||
---
|
||||
|
||||
## Pattern 2: Parallel Fan-out / Fan-in
|
||||
## Recommended Workflow
|
||||
|
||||
**Use when:** Independent tasks that can run concurrently. Research 5 competitors simultaneously.
|
||||
|
||||
```python
|
||||
# parallel_fanout.py
|
||||
import asyncio
|
||||
import anthropic
|
||||
from typing import Any
|
||||
|
||||
async def run_agent(client, task_name: "str-system-str-user-str-model-str"claude-3-5-sonnet-20241022") -> dict:
|
||||
"""Single async agent call"""
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
def _call():
|
||||
return client.messages.create(
|
||||
model=model,
|
||||
max_tokens=2048,
|
||||
system=system,
|
||||
messages=[{"role": "user", "content": user}],
|
||||
)
|
||||
|
||||
response = await loop.run_in_executor(None, _call)
|
||||
return {
|
||||
"task": task_name,
|
||||
"output": response.content[0].text,
|
||||
"tokens": response.usage.input_tokens + response.usage.output_tokens,
|
||||
}
|
||||
|
||||
async def parallel_research(competitors: list[str], research_type: str) -> dict:
|
||||
"""Fan-out: research all competitors in parallel. Fan-in: synthesize results."""
|
||||
client = anthropic.Anthropic()
|
||||
|
||||
# FAN-OUT: spawn parallel agent calls
|
||||
tasks = [
|
||||
run_agent(
|
||||
client,
|
||||
task_name=competitor,
|
||||
system=f"You are a competitive intelligence analyst. Research {competitor} and provide: pricing, key features, target market, and known weaknesses.",
|
||||
user=f"Analyze {competitor} for comparison with our product in the {research_type} market.",
|
||||
)
|
||||
for competitor in competitors
|
||||
]
|
||||
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Handle failures gracefully
|
||||
successful = [r for r in results if not isinstance(r, Exception)]
|
||||
failed = [r for r in results if isinstance(r, Exception)]
|
||||
|
||||
if failed:
|
||||
print(f"Warning: {len(failed)} research tasks failed: {failed}")
|
||||
|
||||
# FAN-IN: synthesize
|
||||
combined_research = "\n\n".join([
|
||||
f"## {r['task']}\n{r['output']}" for r in successful
|
||||
])
|
||||
|
||||
synthesis = await run_agent(
|
||||
client,
|
||||
task_name="synthesizer",
|
||||
system="You are a strategic analyst. Synthesize competitor research into a concise comparison matrix and strategic recommendations.",
|
||||
user=f"Synthesize these competitor analyses:\n\n{combined_research}",
|
||||
model="claude-3-5-sonnet-20241022",
|
||||
)
|
||||
|
||||
return {
|
||||
"individual_analyses": successful,
|
||||
"synthesis": synthesis["output"],
|
||||
"total_tokens": sum(r["tokens"] for r in successful) + synthesis["tokens"],
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Pattern 3: Hierarchical Delegation
|
||||
|
||||
**Use when:** Complex tasks with subtask discovery. Orchestrator breaks down work, delegates to specialists.
|
||||
|
||||
```python
|
||||
# hierarchical_delegation.py
|
||||
import json
|
||||
import anthropic
|
||||
|
||||
ORCHESTRATOR_SYSTEM = """You are an orchestration agent. Your job is to:
|
||||
1. Analyze the user's request
|
||||
2. Break it into subtasks
|
||||
3. Assign each to the appropriate specialist agent
|
||||
4. Collect results and synthesize
|
||||
|
||||
Available specialists:
|
||||
- researcher: finds facts, data, and information
|
||||
- writer: creates content and documents
|
||||
- coder: writes and reviews code
|
||||
- analyst: analyzes data and produces insights
|
||||
|
||||
Respond with a JSON plan:
|
||||
{
|
||||
"subtasks": [
|
||||
{"id": "1", "agent": "researcher", "task": "...", "depends_on": []},
|
||||
{"id": "2", "agent": "writer", "task": "...", "depends_on": ["1"]}
|
||||
]
|
||||
}"""
|
||||
|
||||
SPECIALIST_SYSTEMS = {
|
||||
"researcher": "You are a research specialist. Find accurate, relevant information and cite sources when possible.",
|
||||
"writer": "You are a professional writer. Create clear, engaging content in the requested format.",
|
||||
"coder": "You are a senior software engineer. Write clean, well-commented code with error handling.",
|
||||
"analyst": "You are a data analyst. Provide structured analysis with evidence-backed conclusions.",
|
||||
}
|
||||
|
||||
class HierarchicalOrchestrator:
|
||||
def __init__(self):
|
||||
self.client = anthropic.Anthropic()
|
||||
|
||||
def run(self, user_request: str) -> str:
|
||||
# 1. Orchestrator creates plan
|
||||
plan_response = self.client.messages.create(
|
||||
model="claude-3-5-sonnet-20241022",
|
||||
max_tokens=1024,
|
||||
system=ORCHESTRATOR_SYSTEM,
|
||||
messages=[{"role": "user", "content": user_request}],
|
||||
)
|
||||
|
||||
plan = json.loads(plan_response.content[0].text)
|
||||
results = {}
|
||||
|
||||
# 2. Execute subtasks respecting dependencies
|
||||
for subtask in self._topological_sort(plan["subtasks"]):
|
||||
context = self._build_context(subtask, results)
|
||||
specialist = SPECIALIST_SYSTEMS[subtask["agent"]]
|
||||
|
||||
result = self.client.messages.create(
|
||||
model="claude-3-5-sonnet-20241022",
|
||||
max_tokens=2048,
|
||||
system=specialist,
|
||||
messages=[{"role": "user", "content": f"{context}\n\nTask: {subtask['task']}"}],
|
||||
)
|
||||
results[subtask["id"]] = result.content[0].text
|
||||
|
||||
# 3. Final synthesis
|
||||
all_results = "\n\n".join([f"### {k}\n{v}" for k, v in results.items()])
|
||||
synthesis = self.client.messages.create(
|
||||
model="claude-3-5-sonnet-20241022",
|
||||
max_tokens=2048,
|
||||
system="Synthesize the specialist outputs into a coherent final response.",
|
||||
messages=[{"role": "user", "content": f"Original request: {user_request}\n\nSpecialist outputs:\n{all_results}"}],
|
||||
)
|
||||
return synthesis.content[0].text
|
||||
|
||||
def _build_context(self, subtask: dict, results: dict) -> str:
|
||||
if not subtask.get("depends_on"):
|
||||
return ""
|
||||
deps = [f"Output from task {dep}:\n{results[dep]}" for dep in subtask["depends_on"] if dep in results]
|
||||
return "Previous results:\n" + "\n\n".join(deps) if deps else ""
|
||||
|
||||
def _topological_sort(self, subtasks: list) -> list:
|
||||
# Simple ordered execution respecting depends_on
|
||||
ordered, remaining = [], list(subtasks)
|
||||
completed = set()
|
||||
while remaining:
|
||||
for task in remaining:
|
||||
if all(dep in completed for dep in task.get("depends_on", [])):
|
||||
ordered.append(task)
|
||||
completed.add(task["id"])
|
||||
remaining.remove(task)
|
||||
break
|
||||
return ordered
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Handoff Protocol Template
|
||||
|
||||
```python
|
||||
# Standard handoff context format — use between all agents
|
||||
@dataclass
|
||||
class AgentHandoff:
|
||||
"""Structured context passed between agents in a workflow."""
|
||||
task_id: str
|
||||
workflow_id: str
|
||||
step_number: int
|
||||
total_steps: int
|
||||
|
||||
# What was done
|
||||
previous_agent: str
|
||||
previous_output: str
|
||||
artifacts: dict # {"filename": "content"} for any files produced
|
||||
|
||||
# What to do next
|
||||
current_agent: str
|
||||
current_task: str
|
||||
constraints: list[str] # hard rules for this step
|
||||
|
||||
# Metadata
|
||||
context_budget_remaining: int # tokens left for this agent
|
||||
cost_so_far_usd: float
|
||||
|
||||
def to_prompt(self) -> str:
|
||||
return f"""
|
||||
# Agent Handoff — Step {self.step_number}/{self.total_steps}
|
||||
|
||||
## Your Task
|
||||
{self.current_task}
|
||||
|
||||
## Constraints
|
||||
{chr(10).join(f'- {c}' for c in self.constraints)}
|
||||
|
||||
## Context from Previous Step ({self.previous_agent})
|
||||
{self.previous_output[:2000]}{"... [truncated]" if len(self.previous_output) > 2000 else ""}
|
||||
|
||||
## Context Budget
|
||||
You have approximately {self.context_budget_remaining} tokens remaining. Be concise.
|
||||
"""
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Error Recovery Patterns
|
||||
|
||||
```python
|
||||
import time
|
||||
from functools import wraps
|
||||
|
||||
def with_retry(max_attempts=3, backoff_seconds=2, fallback_model=None):
|
||||
"""Decorator for agent calls with exponential backoff and model fallback."""
|
||||
def decorator(fn):
|
||||
@wraps(fn)
|
||||
def wrapper(*args, **kwargs):
|
||||
last_error = None
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
return fn(*args, **kwargs)
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
if attempt < max_attempts - 1:
|
||||
wait = backoff_seconds * (2 ** attempt)
|
||||
print(f"Attempt {attempt+1} failed: {e}. Retrying in {wait}s...")
|
||||
time.sleep(wait)
|
||||
|
||||
# Fall back to cheaper/faster model on rate limit
|
||||
if fallback_model and "rate_limit" in str(e).lower():
|
||||
kwargs["model"] = fallback_model
|
||||
raise last_error
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
@with_retry(max_attempts=3, fallback_model="claude-3-haiku-20240307")
|
||||
def call_agent(model, system, user):
|
||||
...
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Context Window Budgeting
|
||||
|
||||
```python
|
||||
# Budget context across a multi-step pipeline
|
||||
# Rule: never let any step consume more than 60% of remaining budget
|
||||
|
||||
CONTEXT_LIMITS = {
|
||||
"claude-3-5-sonnet-20241022": 200_000,
|
||||
"gpt-4o": 128_000,
|
||||
}
|
||||
|
||||
class ContextBudget:
|
||||
def __init__(self, model: str, reserve_pct: float = 0.2):
|
||||
total = CONTEXT_LIMITS.get(model, 128_000)
|
||||
self.total = total
|
||||
self.reserve = int(total * reserve_pct) # keep 20% as buffer
|
||||
self.used = 0
|
||||
|
||||
@property
|
||||
def remaining(self):
|
||||
return self.total - self.reserve - self.used
|
||||
|
||||
def allocate(self, step_name: "str-requested-int-int"
|
||||
allocated = min(requested, int(self.remaining * 0.6)) # max 60% of remaining
|
||||
print(f"[Budget] {step_name}: allocated {allocated:,} tokens (remaining: {self.remaining:,})")
|
||||
return allocated
|
||||
|
||||
def consume(self, tokens_used: int):
|
||||
self.used += tokens_used
|
||||
|
||||
def truncate_to_budget(text: str, token_budget: int, chars_per_token: float = 4.0) -> str:
|
||||
"""Rough truncation — use tiktoken for precision."""
|
||||
char_budget = int(token_budget * chars_per_token)
|
||||
if len(text) <= char_budget:
|
||||
return text
|
||||
return text[:char_budget] + "\n\n[... truncated to fit context budget ...]"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Cost Optimization Strategies
|
||||
|
||||
| Strategy | Savings | Tradeoff |
|
||||
|---|---|---|
|
||||
| Use Haiku for routing/classification | 85-90% | Slightly less nuanced judgment |
|
||||
| Cache repeated system prompts | 50-90% | Requires prompt caching setup |
|
||||
| Truncate intermediate outputs | 20-40% | May lose detail in handoffs |
|
||||
| Batch similar tasks | 50% | Latency increases |
|
||||
| Use Sonnet for most, Opus for final step only | 60-70% | Final quality may improve |
|
||||
| Short-circuit on confidence threshold | 30-50% | Need confidence scoring |
|
||||
1. Select pattern based on dependency shape and risk profile.
|
||||
2. Scaffold config via `scripts/workflow_scaffolder.py`.
|
||||
3. Define handoff contract fields for every edge.
|
||||
4. Add retry/timeouts and output validation gates.
|
||||
5. Dry-run with small context budgets before scaling.
|
||||
|
||||
---
|
||||
|
||||
## Common Pitfalls
|
||||
|
||||
- **Circular dependencies** — agents calling each other in loops; enforce DAG structure at design time
|
||||
- **Context bleed** — passing entire previous output to every step; summarize or extract only what's needed
|
||||
- **No timeout** — a stuck agent blocks the whole pipeline; always set max_tokens and wall-clock timeouts
|
||||
- **Silent failures** — agent returns plausible but wrong output; add validation steps for critical paths
|
||||
- **Ignoring cost** — 10 parallel Opus calls is $0.50 per workflow; model selection is a cost decision
|
||||
- **Over-orchestration** — if a single prompt can do it, it should; only add agents when genuinely needed
|
||||
- Over-orchestrating tasks solvable by one well-structured prompt
|
||||
- Missing timeout/retry policies for external-model calls
|
||||
- Passing full upstream context instead of targeted artifacts
|
||||
- Ignoring per-step cost accumulation
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. Start with the smallest pattern that can satisfy requirements.
|
||||
2. Keep handoff payloads explicit and bounded.
|
||||
3. Validate intermediate outputs before fan-in synthesis.
|
||||
4. Enforce budget and timeout limits in every step.
|
||||
|
||||
@@ -0,0 +1,82 @@
|
||||
# Workflow Pattern Templates
|
||||
|
||||
## Sequential
|
||||
|
||||
Use when each step depends on prior output.
|
||||
|
||||
```json
|
||||
{
|
||||
"pattern": "sequential",
|
||||
"steps": ["research", "draft", "review"]
|
||||
}
|
||||
```
|
||||
|
||||
## Parallel
|
||||
|
||||
Use when independent tasks can fan out and then fan in.
|
||||
|
||||
```json
|
||||
{
|
||||
"pattern": "parallel",
|
||||
"fan_out": ["task_a", "task_b", "task_c"],
|
||||
"fan_in": "synthesizer"
|
||||
}
|
||||
```
|
||||
|
||||
## Router
|
||||
|
||||
Use when tasks must be routed to specialized handlers by intent.
|
||||
|
||||
```json
|
||||
{
|
||||
"pattern": "router",
|
||||
"router": "intent_router",
|
||||
"routes": ["sales", "support", "engineering"],
|
||||
"fallback": "generalist"
|
||||
}
|
||||
```
|
||||
|
||||
## Orchestrator
|
||||
|
||||
Use when dynamic planning and dependency management are required.
|
||||
|
||||
```json
|
||||
{
|
||||
"pattern": "orchestrator",
|
||||
"orchestrator": "planner",
|
||||
"specialists": ["researcher", "analyst", "coder"],
|
||||
"dependency_mode": "dag"
|
||||
}
|
||||
```
|
||||
|
||||
## Evaluator
|
||||
|
||||
Use when output quality gates are mandatory before finalization.
|
||||
|
||||
```json
|
||||
{
|
||||
"pattern": "evaluator",
|
||||
"generator": "content_agent",
|
||||
"evaluator": "quality_agent",
|
||||
"max_iterations": 3,
|
||||
"pass_threshold": 0.8
|
||||
}
|
||||
```
|
||||
|
||||
## Pattern Selection Heuristics
|
||||
|
||||
- Choose `sequential` for strict linear workflows.
|
||||
- Choose `parallel` for throughput and latency reduction.
|
||||
- Choose `router` for intent- or type-based branching.
|
||||
- Choose `orchestrator` for complex adaptive workflows.
|
||||
- Choose `evaluator` when correctness/quality loops are required.
|
||||
|
||||
## Handoff Minimum Contract
|
||||
|
||||
- `workflow_id`
|
||||
- `step_id`
|
||||
- `task`
|
||||
- `constraints`
|
||||
- `upstream_artifacts`
|
||||
- `budget_tokens`
|
||||
- `timeout_seconds`
|
||||
113
engineering/agent-workflow-designer/scripts/workflow_scaffolder.py
Executable file
113
engineering/agent-workflow-designer/scripts/workflow_scaffolder.py
Executable file
@@ -0,0 +1,113 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Generate workflow skeleton configs from common multi-agent patterns."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
|
||||
|
||||
def sequential_template(name: str) -> Dict:
|
||||
return {
|
||||
"name": name,
|
||||
"pattern": "sequential",
|
||||
"steps": [
|
||||
{"id": "research", "agent": "researcher", "next": "draft"},
|
||||
{"id": "draft", "agent": "writer", "next": "review"},
|
||||
{"id": "review", "agent": "reviewer", "next": None},
|
||||
],
|
||||
"retry": {"max_attempts": 2, "backoff_seconds": 2},
|
||||
}
|
||||
|
||||
|
||||
def parallel_template(name: str) -> Dict:
|
||||
return {
|
||||
"name": name,
|
||||
"pattern": "parallel",
|
||||
"fan_out": {
|
||||
"tasks": ["research_a", "research_b", "research_c"],
|
||||
"agent": "analyst",
|
||||
},
|
||||
"fan_in": {"agent": "synthesizer", "output": "combined_report"},
|
||||
"timeouts": {"per_task_seconds": 180, "fan_in_seconds": 120},
|
||||
}
|
||||
|
||||
|
||||
def router_template(name: str) -> Dict:
|
||||
return {
|
||||
"name": name,
|
||||
"pattern": "router",
|
||||
"router": {"agent": "router", "routes": ["sales", "support", "engineering"]},
|
||||
"handlers": {
|
||||
"sales": {"agent": "sales_specialist"},
|
||||
"support": {"agent": "support_specialist"},
|
||||
"engineering": {"agent": "engineering_specialist"},
|
||||
},
|
||||
"fallback": {"agent": "generalist"},
|
||||
}
|
||||
|
||||
|
||||
def orchestrator_template(name: str) -> Dict:
|
||||
return {
|
||||
"name": name,
|
||||
"pattern": "orchestrator",
|
||||
"orchestrator": {"agent": "orchestrator", "planning": "dynamic"},
|
||||
"specialists": ["researcher", "coder", "analyst", "writer"],
|
||||
"execution": {
|
||||
"dependency_mode": "dag",
|
||||
"max_parallel": 3,
|
||||
"completion_policy": "all_required",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def evaluator_template(name: str) -> Dict:
|
||||
return {
|
||||
"name": name,
|
||||
"pattern": "evaluator",
|
||||
"generator": {"agent": "generator"},
|
||||
"evaluator": {"agent": "evaluator", "criteria": ["accuracy", "format", "safety"]},
|
||||
"loop": {
|
||||
"max_iterations": 3,
|
||||
"pass_threshold": 0.8,
|
||||
"on_fail": "revise_and_retry",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
PATTERNS = {
|
||||
"sequential": sequential_template,
|
||||
"parallel": parallel_template,
|
||||
"router": router_template,
|
||||
"orchestrator": orchestrator_template,
|
||||
"evaluator": evaluator_template,
|
||||
}
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Generate a workflow skeleton config from a pattern.")
|
||||
parser.add_argument("pattern", choices=sorted(PATTERNS.keys()), help="Workflow pattern")
|
||||
parser.add_argument("--name", default="new-workflow", help="Workflow name")
|
||||
parser.add_argument("--output", help="Optional output path for JSON config")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
config = PATTERNS[args.pattern](args.name)
|
||||
payload = json.dumps(config, indent=2)
|
||||
|
||||
if args.output:
|
||||
out = Path(args.output)
|
||||
out.parent.mkdir(parents=True, exist_ok=True)
|
||||
out.write_text(payload + "\n", encoding="utf-8")
|
||||
print(f"Wrote workflow config to {out}")
|
||||
else:
|
||||
print(payload)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user