feat(agenthub): add AgentHub plugin with cross-domain examples, SEO optimization, and docs site fixes

- AgentHub: 13 files updated with non-engineering examples (content drafts,
  research, strategy) — engineering stays primary, cross-domain secondary
- AgentHub: 7 slash commands, 5 Python scripts, 3 references, 1 agent,
  dry_run.py validation (57 checks)
- Marketplace: agenthub entry added with cross-domain keywords, engineering
  POWERFUL updated (25→30), product (12→13), counts synced across all configs
- SEO: generate-docs.py now produces keyword-rich <title> tags and meta
  descriptions using SKILL.md frontmatter — "Claude Code Skills" in site_name
  propagates to all 276 HTML pages
- SEO: per-domain title suffixes (Agent Skill for Codex & OpenClaw, etc.),
  slug-as-title cleanup, domain label stripping from titles
- Broken links: 141→0 warnings — new rewrite_skill_internal_links() converts
  references/, scripts/, assets/ links to GitHub source URLs; skills/index.md
  phantom slugs fixed (6 marketing, 7 RA/QM)
- Counts synced: 204 skills, 266 tools, 382 refs, 16 agents, 17 commands,
  21 plugins — consistent across CLAUDE.md, README.md, docs/index.md,
  marketplace.json, getting-started.md, mkdocs.yml
- Platform sync: Codex 163 skills, Gemini 246 items, OpenClaw compatible

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Reza Rezvani
2026-03-17 12:10:46 +01:00
parent de724ae5c4
commit 2f57ef8948
319 changed files with 8860 additions and 2681 deletions

View File

@@ -0,0 +1,266 @@
#!/usr/bin/env python3
"""AgentHub message board manager.
CRUD operations for the agent message board: list channels, read posts,
create new posts, and reply to threads.
Usage:
python board_manager.py --list
python board_manager.py --read dispatch
python board_manager.py --post --channel results --author agent-1 --message "Task complete"
python board_manager.py --thread 001-agent-1 --message "Additional details"
python board_manager.py --demo
"""
import argparse
import json
import os
import re
import sys
from datetime import datetime, timezone
BOARD_PATH = ".agenthub/board"
def get_board_path():
"""Get the board directory path."""
if not os.path.isdir(BOARD_PATH):
print(f"Error: Board not found at {BOARD_PATH}. Run hub_init.py first.",
file=sys.stderr)
sys.exit(1)
return BOARD_PATH
def load_index():
"""Load the board index."""
index_path = os.path.join(get_board_path(), "_index.json")
if not os.path.exists(index_path):
return {"channels": ["dispatch", "progress", "results"], "counters": {}}
with open(index_path) as f:
return json.load(f)
def save_index(index):
"""Save the board index."""
index_path = os.path.join(get_board_path(), "_index.json")
with open(index_path, "w") as f:
json.dump(index, f, indent=2)
f.write("\n")
def list_channels(output_format="text"):
"""List all board channels with post counts."""
index = load_index()
channels = []
for ch in index.get("channels", []):
ch_path = os.path.join(get_board_path(), ch)
count = 0
if os.path.isdir(ch_path):
count = len([f for f in os.listdir(ch_path)
if f.endswith(".md")])
channels.append({"channel": ch, "posts": count})
if output_format == "json":
print(json.dumps({"channels": channels}, indent=2))
else:
print("Board Channels:")
print()
for ch in channels:
print(f" {ch['channel']:<15} {ch['posts']} posts")
def parse_post_frontmatter(content):
"""Parse YAML frontmatter from a post."""
metadata = {}
body = content
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
fm = parts[1].strip()
body = parts[2].strip()
for line in fm.split("\n"):
if ":" in line:
key, val = line.split(":", 1)
metadata[key.strip()] = val.strip()
return metadata, body
def read_channel(channel, output_format="text"):
"""Read all posts in a channel."""
ch_path = os.path.join(get_board_path(), channel)
if not os.path.isdir(ch_path):
print(f"Error: Channel '{channel}' not found", file=sys.stderr)
sys.exit(1)
files = sorted([f for f in os.listdir(ch_path) if f.endswith(".md")])
posts = []
for fname in files:
filepath = os.path.join(ch_path, fname)
with open(filepath) as f:
content = f.read()
metadata, body = parse_post_frontmatter(content)
posts.append({
"file": fname,
"metadata": metadata,
"body": body,
})
if output_format == "json":
print(json.dumps({"channel": channel, "posts": posts}, indent=2))
else:
print(f"Channel: {channel} ({len(posts)} posts)")
print("=" * 60)
for post in posts:
author = post["metadata"].get("author", "unknown")
timestamp = post["metadata"].get("timestamp", "")
print(f"\n--- {post['file']} (by {author}, {timestamp}) ---")
print(post["body"])
def create_post(channel, author, message, parent=None):
"""Create a new post in a channel."""
ch_path = os.path.join(get_board_path(), channel)
os.makedirs(ch_path, exist_ok=True)
# Get next sequence number
index = load_index()
counters = index.get("counters", {})
seq = counters.get(channel, 0) + 1
counters[channel] = seq
index["counters"] = counters
save_index(index)
# Generate filename
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
safe_author = re.sub(r"[^a-zA-Z0-9_-]", "", author)
filename = f"{seq:03d}-{safe_author}-{timestamp}.md"
# Build post content
lines = [
"---",
f"author: {author}",
f"timestamp: {datetime.now(timezone.utc).isoformat()}",
f"channel: {channel}",
f"sequence: {seq}",
]
if parent:
lines.append(f"parent: {parent}")
else:
lines.append("parent: null")
lines.append("---")
lines.append("")
lines.append(message)
lines.append("")
filepath = os.path.join(ch_path, filename)
with open(filepath, "w") as f:
f.write("\n".join(lines))
print(f"Posted to {channel}/{filename}")
return filename
def run_demo():
"""Show demo output."""
print("=" * 60)
print("AgentHub Board Manager — Demo Mode")
print("=" * 60)
print()
print("--- Channel List ---")
print("Board Channels:")
print()
print(" dispatch 2 posts")
print(" progress 4 posts")
print(" results 3 posts")
print()
print("--- Read Channel: results ---")
print("Channel: results (3 posts)")
print("=" * 60)
print()
print("--- 001-agent-1-20260317T143510Z.md (by agent-1, 2026-03-17T14:35:10Z) ---")
print("## Result Summary")
print()
print("- **Approach**: Added caching layer for database queries")
print("- **Files changed**: 3")
print("- **Metric**: 165ms (baseline: 180ms, delta: -15ms)")
print("- **Confidence**: Medium — 2 edge cases not covered")
print()
print("--- 002-agent-2-20260317T143645Z.md (by agent-2, 2026-03-17T14:36:45Z) ---")
print("## Result Summary")
print()
print("- **Approach**: Replaced O(n²) sort with hash map lookup")
print("- **Files changed**: 2")
print("- **Metric**: 142ms (baseline: 180ms, delta: -38ms)")
print("- **Confidence**: High — all tests pass")
print()
print("--- 003-agent-3-20260317T143422Z.md (by agent-3, 2026-03-17T14:34:22Z) ---")
print("## Result Summary")
print()
print("- **Approach**: Minor loop optimizations")
print("- **Files changed**: 1")
print("- **Metric**: 190ms (baseline: 180ms, delta: +10ms)")
print("- **Confidence**: Low — no meaningful improvement")
def main():
parser = argparse.ArgumentParser(
description="AgentHub message board manager"
)
parser.add_argument("--list", action="store_true",
help="List all channels with post counts")
parser.add_argument("--read", type=str, metavar="CHANNEL",
help="Read all posts in a channel")
parser.add_argument("--post", action="store_true",
help="Create a new post")
parser.add_argument("--channel", type=str,
help="Channel for --post or --thread")
parser.add_argument("--author", type=str,
help="Author name for --post")
parser.add_argument("--message", type=str,
help="Message content for --post or --thread")
parser.add_argument("--thread", type=str, metavar="POST_ID",
help="Reply to a post (sets parent)")
parser.add_argument("--format", choices=["text", "json"], default="text",
help="Output format (default: text)")
parser.add_argument("--demo", action="store_true",
help="Show demo output")
args = parser.parse_args()
if args.demo:
run_demo()
return
if args.list:
list_channels(args.format)
return
if args.read:
read_channel(args.read, args.format)
return
if args.post:
if not args.channel or not args.author or not args.message:
print("Error: --post requires --channel, --author, and --message",
file=sys.stderr)
sys.exit(1)
create_post(args.channel, args.author, args.message)
return
if args.thread:
if not args.message:
print("Error: --thread requires --message", file=sys.stderr)
sys.exit(1)
channel = args.channel or "results"
author = args.author or "coordinator"
create_post(channel, author, args.message, parent=args.thread)
return
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,275 @@
#!/usr/bin/env python3
"""Analyze the AgentHub git DAG.
Detects frontier branches (leaves with no children), displays DAG graphs,
and shows per-agent branch status for a session.
Usage:
python dag_analyzer.py --frontier --session 20260317-143022
python dag_analyzer.py --graph
python dag_analyzer.py --status --session 20260317-143022
python dag_analyzer.py --demo
"""
import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime
def run_git(*args):
"""Run a git command and return stdout."""
try:
result = subprocess.run(
["git"] + list(args),
capture_output=True, text=True, check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
print(f"Git error: {e.stderr.strip()}", file=sys.stderr)
return ""
def get_hub_branches(session_id=None):
"""Get all hub/* branches, optionally filtered by session."""
output = run_git("branch", "--list", "hub/*", "--format=%(refname:short)")
if not output:
return []
branches = output.strip().split("\n")
if session_id:
prefix = f"hub/{session_id}/"
branches = [b for b in branches if b.startswith(prefix)]
return branches
def get_branch_commit(branch):
"""Get the commit hash for a branch."""
return run_git("rev-parse", "--short", branch)
def get_branch_commit_count(branch, base_branch="main"):
"""Count commits ahead of base branch."""
output = run_git("rev-list", "--count", f"{base_branch}..{branch}")
try:
return int(output)
except ValueError:
return 0
def get_branch_last_commit_date(branch):
"""Get the last commit date for a branch."""
output = run_git("log", "-1", "--format=%ci", branch)
if output:
return output[:19]
return "unknown"
def get_branch_last_commit_msg(branch):
"""Get the last commit message for a branch."""
return run_git("log", "-1", "--format=%s", branch)
def detect_frontier(session_id=None):
"""Find frontier branches (tips with no child branches).
A branch is on the frontier if no other hub branch contains its tip commit
as an ancestor (i.e., it has no children in the DAG).
"""
branches = get_hub_branches(session_id)
if not branches:
return []
# Get commit hashes for all branches
branch_commits = {}
for b in branches:
commit = run_git("rev-parse", b)
if commit:
branch_commits[b] = commit
# A branch is frontier if its commit is not an ancestor of any other branch
frontier = []
for branch, commit in branch_commits.items():
is_ancestor = False
for other_branch, other_commit in branch_commits.items():
if other_branch == branch:
continue
# Check if commit is ancestor of other_commit
result = subprocess.run(
["git", "merge-base", "--is-ancestor", commit, other_commit],
capture_output=True
)
if result.returncode == 0:
is_ancestor = True
break
if not is_ancestor:
frontier.append(branch)
return frontier
def show_graph():
"""Display the git DAG graph for hub branches."""
branches = get_hub_branches()
if not branches:
print("No hub/* branches found.")
return
# Use git log with graph for hub branches
branch_args = [b for b in branches]
output = run_git(
"log", "--all", "--oneline", "--graph", "--decorate",
"--simplify-by-decoration",
*[f"--branches=hub/*"]
)
if output:
print(output)
else:
print("No hub commits found.")
def show_status(session_id, output_format="table"):
"""Show per-agent branch status for a session."""
branches = get_hub_branches(session_id)
if not branches:
print(f"No branches found for session {session_id}")
return
frontier = detect_frontier(session_id)
# Parse agent info from branch names
agents = []
for branch in sorted(branches):
# Pattern: hub/{session}/agent-{N}/attempt-{M}
match = re.match(r"hub/[^/]+/agent-(\d+)/attempt-(\d+)", branch)
if match:
agent_num = int(match.group(1))
attempt = int(match.group(2))
else:
agent_num = 0
attempt = 1
commit = get_branch_commit(branch)
commits = get_branch_commit_count(branch)
last_date = get_branch_last_commit_date(branch)
last_msg = get_branch_last_commit_msg(branch)
is_frontier = branch in frontier
agents.append({
"agent": agent_num,
"attempt": attempt,
"branch": branch,
"commit": commit,
"commits_ahead": commits,
"last_update": last_date,
"last_message": last_msg,
"frontier": is_frontier,
})
if output_format == "json":
print(json.dumps({"session": session_id, "agents": agents}, indent=2))
return
# Table output
print(f"Session: {session_id}")
print(f"Branches: {len(branches)} | Frontier: {len(frontier)}")
print()
header = f"{'AGENT':<8} {'BRANCH':<45} {'COMMITS':<8} {'STATUS':<10} {'LAST UPDATE':<20}"
print(header)
print("-" * len(header))
for a in agents:
status = "frontier" if a["frontier"] else "merged"
print(f"agent-{a['agent']:<4} {a['branch']:<45} {a['commits_ahead']:<8} {status:<10} {a['last_update']:<20}")
def run_demo():
"""Show demo output."""
print("=" * 60)
print("AgentHub DAG Analyzer — Demo Mode")
print("=" * 60)
print()
print("--- Frontier Detection ---")
print("Frontier branches (leaves with no children):")
print(" hub/20260317-143022/agent-1/attempt-1 (3 commits ahead)")
print(" hub/20260317-143022/agent-2/attempt-1 (5 commits ahead)")
print(" hub/20260317-143022/agent-3/attempt-1 (2 commits ahead)")
print()
print("--- Session Status ---")
print("Session: 20260317-143022")
print("Branches: 3 | Frontier: 3")
print()
header = f"{'AGENT':<8} {'BRANCH':<45} {'COMMITS':<8} {'STATUS':<10} {'LAST UPDATE':<20}"
print(header)
print("-" * len(header))
print(f"{'agent-1':<8} {'hub/20260317-143022/agent-1/attempt-1':<45} {'3':<8} {'frontier':<10} {'2026-03-17 14:35:10':<20}")
print(f"{'agent-2':<8} {'hub/20260317-143022/agent-2/attempt-1':<45} {'5':<8} {'frontier':<10} {'2026-03-17 14:36:45':<20}")
print(f"{'agent-3':<8} {'hub/20260317-143022/agent-3/attempt-1':<45} {'2':<8} {'frontier':<10} {'2026-03-17 14:34:22':<20}")
print()
print("--- DAG Graph ---")
print("* abc1234 (hub/20260317-143022/agent-2/attempt-1) Replaced O(n²) with hash map")
print("* def5678 Added benchmark tests")
print("| * ghi9012 (hub/20260317-143022/agent-1/attempt-1) Added caching layer")
print("| * jkl3456 Refactored data access")
print("|/")
print("| * mno7890 (hub/20260317-143022/agent-3/attempt-1) Minor optimizations")
print("|/")
print("* pqr1234 (dev) Base commit")
def main():
parser = argparse.ArgumentParser(
description="Analyze the AgentHub git DAG"
)
parser.add_argument("--frontier", action="store_true",
help="List frontier branches (leaves with no children)")
parser.add_argument("--graph", action="store_true",
help="Show ASCII DAG graph for hub branches")
parser.add_argument("--status", action="store_true",
help="Show per-agent branch status")
parser.add_argument("--session", type=str,
help="Filter by session ID")
parser.add_argument("--format", choices=["table", "json"], default="table",
help="Output format (default: table)")
parser.add_argument("--demo", action="store_true",
help="Show demo output")
args = parser.parse_args()
if args.demo:
run_demo()
return
if not any([args.frontier, args.graph, args.status]):
parser.print_help()
return
if args.frontier:
frontier = detect_frontier(args.session)
if args.format == "json":
print(json.dumps({"frontier": frontier}, indent=2))
else:
if frontier:
print("Frontier branches:")
for b in frontier:
print(f" {b}")
else:
print("No frontier branches found.")
print()
if args.graph:
show_graph()
print()
if args.status:
if not args.session:
print("Error: --session required with --status", file=sys.stderr)
sys.exit(1)
show_status(args.session, args.format)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,318 @@
#!/usr/bin/env python3
"""Dry-run validation for the AgentHub plugin.
Checks JSON validity, YAML frontmatter, markdown structure, cross-file
consistency, script --help, and referenced file existence — without
creating any sessions or worktrees.
Usage:
python dry_run.py # Run all checks
python dry_run.py --verbose # Show per-file details
python dry_run.py --help
"""
import argparse
import json
import os
import re
import subprocess
import sys
PLUGIN_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# ── Helpers ──────────────────────────────────────────────────────────
PASS = "\033[32m✓\033[0m"
FAIL = "\033[31m✗\033[0m"
WARN = "\033[33m!\033[0m"
class Results:
def __init__(self):
self.passed = 0
self.failed = 0
self.warnings = 0
self.details = []
def ok(self, msg):
self.passed += 1
self.details.append((PASS, msg))
def fail(self, msg):
self.failed += 1
self.details.append((FAIL, msg))
def warn(self, msg):
self.warnings += 1
self.details.append((WARN, msg))
def print(self, verbose=False):
if verbose:
for icon, msg in self.details:
print(f" {icon} {msg}")
print()
total = self.passed + self.failed
status = "PASS" if self.failed == 0 else "FAIL"
color = "\033[32m" if self.failed == 0 else "\033[31m"
warn_str = f", {self.warnings} warnings" if self.warnings else ""
print(f"{color}{status}\033[0m {self.passed}/{total} checks passed{warn_str}")
return self.failed == 0
def rel(path):
"""Path relative to plugin root for display."""
return os.path.relpath(path, PLUGIN_ROOT)
# ── Check 1: JSON files ─────────────────────────────────────────────
def check_json(results):
"""Validate settings.json and plugin.json."""
json_files = [
os.path.join(PLUGIN_ROOT, "settings.json"),
os.path.join(PLUGIN_ROOT, ".claude-plugin", "plugin.json"),
]
for path in json_files:
name = rel(path)
if not os.path.exists(path):
results.fail(f"{name} — file missing")
continue
try:
with open(path) as f:
data = json.load(f)
results.ok(f"{name} — valid JSON")
except json.JSONDecodeError as e:
results.fail(f"{name} — invalid JSON: {e}")
continue
# plugin.json: only allowed fields
if name.endswith("plugin.json"):
allowed = {"name", "description", "version", "author", "homepage",
"repository", "license", "skills"}
extra = set(data.keys()) - allowed
if extra:
results.fail(f"{name} — disallowed fields: {extra}")
else:
results.ok(f"{name} — schema fields OK")
# Cross-check versions
try:
with open(json_files[0]) as f:
v1 = json.load(f).get("version")
with open(json_files[1]) as f:
v2 = json.load(f).get("version")
if v1 and v2 and v1 == v2:
results.ok(f"version match ({v1})")
elif v1 and v2:
results.fail(f"version mismatch: settings={v1}, plugin={v2}")
except Exception:
pass
# ── Check 2: YAML frontmatter ───────────────────────────────────────
FRONTMATTER_RE = re.compile(r"^---\n(.+?)\n---", re.DOTALL)
REQUIRED_FM_KEYS = {"name", "description"}
def check_frontmatter(results):
"""Validate YAML frontmatter in all SKILL.md files."""
skill_files = []
for root, _dirs, files in os.walk(PLUGIN_ROOT):
for f in files:
if f == "SKILL.md":
skill_files.append(os.path.join(root, f))
for path in skill_files:
name = rel(path)
with open(path) as f:
content = f.read()
m = FRONTMATTER_RE.match(content)
if not m:
results.fail(f"{name} — missing YAML frontmatter")
continue
# Lightweight key check (no PyYAML dependency)
fm_text = m.group(1)
found_keys = set()
for line in fm_text.splitlines():
if ":" in line:
key = line.split(":", 1)[0].strip()
found_keys.add(key)
missing = REQUIRED_FM_KEYS - found_keys
if missing:
results.fail(f"{name} — frontmatter missing keys: {missing}")
else:
results.ok(f"{name} — frontmatter OK")
# ── Check 3: Markdown structure ──────────────────────────────────────
def check_markdown(results):
"""Check for broken code fences and table rows in all .md files."""
md_files = []
for root, _dirs, files in os.walk(PLUGIN_ROOT):
for f in files:
if f.endswith(".md"):
md_files.append(os.path.join(root, f))
for path in md_files:
name = rel(path)
with open(path) as f:
lines = f.readlines()
# Code fences must be balanced
fence_count = sum(1 for ln in lines if ln.strip().startswith("```"))
if fence_count % 2 != 0:
results.fail(f"{name} — unbalanced code fences ({fence_count} found)")
else:
results.ok(f"{name} — code fences balanced")
# Tables: rows inside a table should have consistent pipe count
in_table = False
table_pipes = 0
table_ok = True
for i, ln in enumerate(lines, 1):
stripped = ln.strip()
if stripped.startswith("|") and stripped.endswith("|"):
pipes = stripped.count("|")
if not in_table:
in_table = True
table_pipes = pipes
elif pipes != table_pipes:
# Separator rows (|---|---| ) can differ slightly; skip
if not re.match(r"^\|[\s\-:|]+\|$", stripped):
results.warn(f"{name}:{i} — table column count mismatch ({pipes} vs {table_pipes})")
table_ok = False
else:
in_table = False
table_pipes = 0
# ── Check 4: Scripts --help ──────────────────────────────────────────
def check_scripts(results):
"""Verify every Python script exits 0 on --help."""
scripts_dir = os.path.join(PLUGIN_ROOT, "scripts")
if not os.path.isdir(scripts_dir):
results.warn("scripts/ directory not found")
return
for fname in sorted(os.listdir(scripts_dir)):
if not fname.endswith(".py") or fname == "dry_run.py":
continue
path = os.path.join(scripts_dir, fname)
try:
proc = subprocess.run(
[sys.executable, path, "--help"],
capture_output=True, text=True, timeout=10,
)
if proc.returncode == 0:
results.ok(f"scripts/{fname} --help exits 0")
else:
results.fail(f"scripts/{fname} --help exits {proc.returncode}")
except subprocess.TimeoutExpired:
results.fail(f"scripts/{fname} --help timed out")
except Exception as e:
results.fail(f"scripts/{fname} --help error: {e}")
# ── Check 5: Referenced files exist ──────────────────────────────────
def check_references(results):
"""Verify that key files referenced in docs actually exist."""
expected = [
"settings.json",
".claude-plugin/plugin.json",
"CLAUDE.md",
"SKILL.md",
"README.md",
"agents/hub-coordinator.md",
"references/agent-templates.md",
"references/coordination-strategies.md",
"scripts/hub_init.py",
"scripts/dag_analyzer.py",
"scripts/board_manager.py",
"scripts/result_ranker.py",
"scripts/session_manager.py",
]
for ref in expected:
path = os.path.join(PLUGIN_ROOT, ref)
if os.path.exists(path):
results.ok(f"{ref} exists")
else:
results.fail(f"{ref} — referenced but missing")
# ── Check 6: Cross-domain coverage ──────────────────────────────────
def check_cross_domain(results):
"""Verify non-engineering examples exist in key files (the whole point of this update)."""
checks = [
("settings.json", "content-generation"),
(".claude-plugin/plugin.json", "content drafts"),
("CLAUDE.md", "content drafts"),
("SKILL.md", "content variation"),
("README.md", "content generation"),
("skills/run/SKILL.md", "--judge"),
("skills/init/SKILL.md", "LLM judge"),
("skills/eval/SKILL.md", "narrative"),
("skills/board/SKILL.md", "Storytelling"),
("skills/status/SKILL.md", "Storytelling"),
("references/agent-templates.md", "landing page copy"),
("references/coordination-strategies.md", "flesch_score"),
("agents/hub-coordinator.md", "qualitative verdict"),
]
for filepath, needle in checks:
path = os.path.join(PLUGIN_ROOT, filepath)
if not os.path.exists(path):
results.fail(f"{filepath} — missing (cannot check cross-domain)")
continue
with open(path) as f:
content = f.read()
if needle.lower() in content.lower():
results.ok(f"{filepath} — contains cross-domain example (\"{needle}\")")
else:
results.fail(f"{filepath} — missing cross-domain marker \"{needle}\"")
# ── Main ─────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Dry-run validation for the AgentHub plugin."
)
parser.add_argument("--verbose", "-v", action="store_true",
help="Show per-file check details")
args = parser.parse_args()
print(f"AgentHub dry-run validation")
print(f"Plugin root: {PLUGIN_ROOT}\n")
all_ok = True
sections = [
("JSON validity", check_json),
("YAML frontmatter", check_frontmatter),
("Markdown structure", check_markdown),
("Script --help", check_scripts),
("Referenced files", check_references),
("Cross-domain examples", check_cross_domain),
]
for title, fn in sections:
print(f"── {title} ──")
r = Results()
fn(r)
ok = r.print(verbose=args.verbose)
if not ok:
all_ok = False
print()
if all_ok:
print("\033[32mAll checks passed.\033[0m")
else:
print("\033[31mSome checks failed — see above.\033[0m")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,253 @@
#!/usr/bin/env python3
"""Initialize an AgentHub collaboration session.
Creates the .agenthub/ directory structure, generates a session ID,
and writes config.yaml and state.json for the session.
Usage:
python hub_init.py --task "Optimize API response time" --agents 3 \\
--eval "pytest bench.py --json" --metric p50_ms --direction lower
python hub_init.py --task "Refactor auth module" --agents 2
python hub_init.py --demo
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone
def generate_session_id():
"""Generate a timestamp-based session ID."""
return datetime.now().strftime("%Y%m%d-%H%M%S")
def create_directory_structure(base_path):
"""Create the .agenthub/ directory tree."""
dirs = [
os.path.join(base_path, "sessions"),
os.path.join(base_path, "board", "dispatch"),
os.path.join(base_path, "board", "progress"),
os.path.join(base_path, "board", "results"),
]
for d in dirs:
os.makedirs(d, exist_ok=True)
def write_gitignore(base_path):
"""Write .agenthub/.gitignore to exclude worktree artifacts."""
gitignore_path = os.path.join(base_path, ".gitignore")
if not os.path.exists(gitignore_path):
with open(gitignore_path, "w") as f:
f.write("# AgentHub gitignore\n")
f.write("# Keep board and sessions, ignore worktree artifacts\n")
f.write("*.tmp\n")
f.write("*.lock\n")
def write_board_index(base_path):
"""Initialize the board index file."""
index_path = os.path.join(base_path, "board", "_index.json")
if not os.path.exists(index_path):
index = {
"channels": ["dispatch", "progress", "results"],
"counters": {"dispatch": 0, "progress": 0, "results": 0},
}
with open(index_path, "w") as f:
json.dump(index, f, indent=2)
f.write("\n")
def create_session(base_path, session_id, task, agents, eval_cmd, metric,
direction, base_branch):
"""Create a new session with config and state files."""
session_dir = os.path.join(base_path, "sessions", session_id)
os.makedirs(session_dir, exist_ok=True)
# Write config.yaml (manual YAML to avoid dependency)
config_path = os.path.join(session_dir, "config.yaml")
config_lines = [
f"session_id: {session_id}",
f"task: \"{task}\"",
f"agent_count: {agents}",
f"base_branch: {base_branch}",
f"created: {datetime.now(timezone.utc).isoformat()}",
]
if eval_cmd:
config_lines.append(f"eval_cmd: \"{eval_cmd}\"")
if metric:
config_lines.append(f"metric: {metric}")
if direction:
config_lines.append(f"direction: {direction}")
with open(config_path, "w") as f:
f.write("\n".join(config_lines))
f.write("\n")
# Write state.json
state_path = os.path.join(session_dir, "state.json")
state = {
"session_id": session_id,
"state": "init",
"created": datetime.now(timezone.utc).isoformat(),
"updated": datetime.now(timezone.utc).isoformat(),
"agents": {},
}
with open(state_path, "w") as f:
json.dump(state, f, indent=2)
f.write("\n")
return session_dir
def validate_git_repo():
"""Check if current directory is a git repository."""
if not os.path.isdir(".git"):
# Check parent dirs
path = os.path.abspath(".")
while path != "/":
if os.path.isdir(os.path.join(path, ".git")):
return True
path = os.path.dirname(path)
return False
return True
def get_current_branch():
"""Get the current git branch name."""
head_file = os.path.join(".git", "HEAD")
if os.path.exists(head_file):
with open(head_file) as f:
ref = f.read().strip()
if ref.startswith("ref: refs/heads/"):
return ref[len("ref: refs/heads/"):]
return "main"
def run_demo():
"""Show a demo of what hub_init creates."""
print("=" * 60)
print("AgentHub Init — Demo Mode")
print("=" * 60)
print()
print("Session ID: 20260317-143022")
print("Task: Optimize API response time below 100ms")
print("Agents: 3")
print("Eval: pytest bench.py --json")
print("Metric: p50_ms (lower is better)")
print("Base branch: dev")
print()
print("Directory structure created:")
print(" .agenthub/")
print(" ├── .gitignore")
print(" ├── sessions/")
print(" │ └── 20260317-143022/")
print(" │ ├── config.yaml")
print(" │ └── state.json")
print(" └── board/")
print(" ├── _index.json")
print(" ├── dispatch/")
print(" ├── progress/")
print(" └── results/")
print()
print("config.yaml:")
print(' session_id: 20260317-143022')
print(' task: "Optimize API response time below 100ms"')
print(" agent_count: 3")
print(" base_branch: dev")
print(' eval_cmd: "pytest bench.py --json"')
print(" metric: p50_ms")
print(" direction: lower")
print()
print("state.json:")
print(' { "state": "init", "agents": {} }')
print()
print("Next step: Run /hub:spawn to launch agents")
def main():
parser = argparse.ArgumentParser(
description="Initialize an AgentHub collaboration session"
)
parser.add_argument("--task", type=str, help="Task description for agents")
parser.add_argument("--agents", type=int, default=3,
help="Number of parallel agents (default: 3)")
parser.add_argument("--eval", type=str, dest="eval_cmd",
help="Evaluation command to run in each worktree")
parser.add_argument("--metric", type=str,
help="Metric name to extract from eval output")
parser.add_argument("--direction", choices=["lower", "higher"],
help="Whether lower or higher metric is better")
parser.add_argument("--base-branch", type=str,
help="Base branch (default: current branch)")
parser.add_argument("--format", choices=["text", "json"], default="text",
help="Output format (default: text)")
parser.add_argument("--demo", action="store_true",
help="Show demo output without creating files")
args = parser.parse_args()
if args.demo:
run_demo()
return
if not args.task:
print("Error: --task is required", file=sys.stderr)
print("Usage: hub_init.py --task 'description' [--agents N] "
"[--eval 'cmd'] [--metric name] [--direction lower|higher]",
file=sys.stderr)
sys.exit(1)
if not validate_git_repo():
print("Error: Not a git repository. AgentHub requires git.",
file=sys.stderr)
sys.exit(1)
base_branch = args.base_branch or get_current_branch()
base_path = ".agenthub"
session_id = generate_session_id()
# Create structure
create_directory_structure(base_path)
write_gitignore(base_path)
write_board_index(base_path)
# Create session
session_dir = create_session(
base_path, session_id, args.task, args.agents,
args.eval_cmd, args.metric, args.direction, base_branch
)
if args.format == "json":
output = {
"session_id": session_id,
"session_dir": session_dir,
"task": args.task,
"agent_count": args.agents,
"eval_cmd": args.eval_cmd,
"metric": args.metric,
"direction": args.direction,
"base_branch": base_branch,
"state": "init",
}
print(json.dumps(output, indent=2))
else:
print(f"AgentHub session initialized")
print(f" Session ID: {session_id}")
print(f" Task: {args.task}")
print(f" Agents: {args.agents}")
if args.eval_cmd:
print(f" Eval: {args.eval_cmd}")
if args.metric:
direction_str = "lower is better" if args.direction == "lower" else "higher is better"
print(f" Metric: {args.metric} ({direction_str})")
print(f" Base branch: {base_branch}")
print(f" State: init")
print()
print(f"Next step: Run /hub:spawn to launch {args.agents} agents")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,315 @@
#!/usr/bin/env python3
"""Rank AgentHub agent results by metric or diff quality.
Runs an evaluation command in each agent's worktree, parses a metric,
and produces a ranked table.
Usage:
python result_ranker.py --session 20260317-143022 \\
--eval-cmd "pytest bench.py --json" --metric p50_ms --direction lower
python result_ranker.py --session 20260317-143022 --diff-summary
python result_ranker.py --demo
"""
import argparse
import json
import os
import re
import subprocess
import sys
def run_git(*args):
"""Run a git command and return stdout."""
try:
result = subprocess.run(
["git"] + list(args),
capture_output=True, text=True, check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
return ""
def get_session_config(session_id):
"""Load session config."""
config_path = os.path.join(".agenthub", "sessions", session_id, "config.yaml")
if not os.path.exists(config_path):
print(f"Error: Session {session_id} not found", file=sys.stderr)
sys.exit(1)
config = {}
with open(config_path) as f:
for line in f:
line = line.strip()
if ":" in line and not line.startswith("#"):
key, val = line.split(":", 1)
val = val.strip().strip('"')
config[key.strip()] = val
return config
def get_hub_branches(session_id):
"""Get all hub branches for a session."""
output = run_git("branch", "--list", f"hub/{session_id}/*",
"--format=%(refname:short)")
if not output:
return []
return [b.strip() for b in output.split("\n") if b.strip()]
def get_worktree_path(branch):
"""Get the worktree path for a branch, if it exists."""
output = run_git("worktree", "list", "--porcelain")
if not output:
return None
current_path = None
for line in output.split("\n"):
if line.startswith("worktree "):
current_path = line[len("worktree "):]
elif line.startswith("branch ") and current_path:
ref = line[len("branch "):]
short = ref.replace("refs/heads/", "")
if short == branch:
return current_path
current_path = None
return None
def run_eval_in_worktree(worktree_path, eval_cmd):
"""Run evaluation command in a worktree and return stdout."""
try:
result = subprocess.run(
eval_cmd, shell=True, capture_output=True, text=True,
cwd=worktree_path, timeout=120
)
return result.stdout.strip(), result.returncode
except subprocess.TimeoutExpired:
return "TIMEOUT", 1
except Exception as e:
return str(e), 1
def extract_metric(output, metric_name):
"""Extract a numeric metric from command output.
Looks for patterns like:
- metric_name: 42.5
- metric_name=42.5
- "metric_name": 42.5
"""
patterns = [
rf'{metric_name}\s*[:=]\s*([\d.]+)',
rf'"{metric_name}"\s*[:=]\s*([\d.]+)',
rf"'{metric_name}'\s*[:=]\s*([\d.]+)",
]
for pattern in patterns:
match = re.search(pattern, output, re.IGNORECASE)
if match:
try:
return float(match.group(1))
except ValueError:
continue
return None
def get_diff_stats(branch, base_branch="main"):
"""Get diff statistics for a branch vs base."""
output = run_git("diff", "--stat", f"{base_branch}...{branch}")
lines_output = run_git("diff", "--shortstat", f"{base_branch}...{branch}")
files_changed = 0
insertions = 0
deletions = 0
if lines_output:
files_match = re.search(r"(\d+) files? changed", lines_output)
ins_match = re.search(r"(\d+) insertions?", lines_output)
del_match = re.search(r"(\d+) deletions?", lines_output)
if files_match:
files_changed = int(files_match.group(1))
if ins_match:
insertions = int(ins_match.group(1))
if del_match:
deletions = int(del_match.group(1))
return {
"files_changed": files_changed,
"insertions": insertions,
"deletions": deletions,
"net_lines": insertions - deletions,
}
def rank_by_metric(results, direction="lower"):
"""Sort results by metric value."""
valid = [r for r in results if r.get("metric_value") is not None]
invalid = [r for r in results if r.get("metric_value") is None]
reverse = direction == "higher"
valid.sort(key=lambda r: r["metric_value"], reverse=reverse)
for i, r in enumerate(valid):
r["rank"] = i + 1
for r in invalid:
r["rank"] = len(valid) + 1
return valid + invalid
def run_demo():
"""Show demo ranking output."""
print("=" * 60)
print("AgentHub Result Ranker — Demo Mode")
print("=" * 60)
print()
print("Session: 20260317-143022")
print("Eval: pytest bench.py --json")
print("Metric: p50_ms (lower is better)")
print("Baseline: 180ms")
print()
header = f"{'RANK':<6} {'AGENT':<10} {'METRIC':<10} {'DELTA':<10} {'FILES':<7} {'SUMMARY'}"
print(header)
print("-" * 75)
print(f"{'1':<6} {'agent-2':<10} {'142ms':<10} {'-38ms':<10} {'2':<7} Replaced O(n²) with hash map lookup")
print(f"{'2':<6} {'agent-1':<10} {'165ms':<10} {'-15ms':<10} {'3':<7} Added caching layer")
print(f"{'3':<6} {'agent-3':<10} {'190ms':<10} {'+10ms':<10} {'1':<7} Minor loop optimizations")
print()
print("Winner: agent-2 (142ms, -21% from baseline)")
print()
print("Next step: Run /hub:merge to merge agent-2's branch")
def main():
parser = argparse.ArgumentParser(
description="Rank AgentHub agent results"
)
parser.add_argument("--session", type=str,
help="Session ID to evaluate")
parser.add_argument("--eval-cmd", type=str,
help="Evaluation command to run in each worktree")
parser.add_argument("--metric", type=str,
help="Metric name to extract from eval output")
parser.add_argument("--direction", choices=["lower", "higher"],
default="lower",
help="Whether lower or higher metric is better")
parser.add_argument("--baseline", type=float,
help="Baseline metric value for delta calculation")
parser.add_argument("--diff-summary", action="store_true",
help="Show diff statistics per agent (no eval cmd needed)")
parser.add_argument("--format", choices=["table", "json"], default="table",
help="Output format (default: table)")
parser.add_argument("--demo", action="store_true",
help="Show demo output")
args = parser.parse_args()
if args.demo:
run_demo()
return
if not args.session:
print("Error: --session is required", file=sys.stderr)
sys.exit(1)
config = get_session_config(args.session)
branches = get_hub_branches(args.session)
if not branches:
print(f"No branches found for session {args.session}")
return
eval_cmd = args.eval_cmd or config.get("eval_cmd")
metric = args.metric or config.get("metric")
direction = args.direction or config.get("direction", "lower")
base_branch = config.get("base_branch", "main")
results = []
for branch in branches:
# Extract agent number
match = re.match(r"hub/[^/]+/agent-(\d+)/", branch)
agent_id = f"agent-{match.group(1)}" if match else branch.split("/")[-2]
result = {
"agent": agent_id,
"branch": branch,
"metric_value": None,
"metric_raw": None,
"diff": get_diff_stats(branch, base_branch),
}
if eval_cmd and metric:
worktree = get_worktree_path(branch)
if worktree:
output, returncode = run_eval_in_worktree(worktree, eval_cmd)
result["metric_raw"] = output
result["eval_returncode"] = returncode
if returncode == 0:
result["metric_value"] = extract_metric(output, metric)
results.append(result)
# Rank
ranked = rank_by_metric(results, direction)
# Calculate deltas
baseline = args.baseline
if baseline is None and ranked and ranked[0].get("metric_value") is not None:
# Use worst as baseline if not specified
values = [r["metric_value"] for r in ranked if r["metric_value"] is not None]
if values:
baseline = max(values) if direction == "lower" else min(values)
for r in ranked:
if r.get("metric_value") is not None and baseline is not None:
r["delta"] = r["metric_value"] - baseline
else:
r["delta"] = None
if args.format == "json":
print(json.dumps({"session": args.session, "results": ranked}, indent=2))
return
# Table output
print(f"Session: {args.session}")
if eval_cmd:
print(f"Eval: {eval_cmd}")
if metric:
dir_str = "lower is better" if direction == "lower" else "higher is better"
print(f"Metric: {metric} ({dir_str})")
if baseline:
print(f"Baseline: {baseline}")
print()
if args.diff_summary or not eval_cmd:
header = f"{'RANK':<6} {'AGENT':<12} {'FILES':<7} {'ADDED':<8} {'REMOVED':<8} {'NET':<6}"
print(header)
print("-" * 50)
for i, r in enumerate(ranked):
d = r["diff"]
print(f"{i+1:<6} {r['agent']:<12} {d['files_changed']:<7} "
f"+{d['insertions']:<7} -{d['deletions']:<7} {d['net_lines']:<6}")
else:
header = f"{'RANK':<6} {'AGENT':<12} {'METRIC':<12} {'DELTA':<10} {'FILES':<7}"
print(header)
print("-" * 50)
for r in ranked:
mv = str(r["metric_value"]) if r["metric_value"] is not None else "N/A"
delta = ""
if r["delta"] is not None:
sign = "+" if r["delta"] >= 0 else ""
delta = f"{sign}{r['delta']:.1f}"
print(f"{r['rank']:<6} {r['agent']:<12} {mv:<12} {delta:<10} {r['diff']['files_changed']:<7}")
# Winner
if ranked and ranked[0].get("metric_value") is not None:
winner = ranked[0]
print()
print(f"Winner: {winner['agent']} ({winner['metric_value']})")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,302 @@
#!/usr/bin/env python3
"""AgentHub session state machine and lifecycle manager.
Manages session states (init → running → evaluating → merged/archived),
lists sessions, and handles cleanup of worktrees and branches.
Usage:
python session_manager.py --list
python session_manager.py --status 20260317-143022
python session_manager.py --update 20260317-143022 --state running
python session_manager.py --cleanup 20260317-143022
python session_manager.py --demo
"""
import argparse
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
SESSIONS_PATH = ".agenthub/sessions"
VALID_STATES = ["init", "running", "evaluating", "merged", "archived"]
VALID_TRANSITIONS = {
"init": ["running"],
"running": ["evaluating"],
"evaluating": ["merged", "archived"],
"merged": [],
"archived": [],
}
def load_state(session_id):
"""Load session state.json."""
state_path = os.path.join(SESSIONS_PATH, session_id, "state.json")
if not os.path.exists(state_path):
return None
with open(state_path) as f:
return json.load(f)
def save_state(session_id, state):
"""Save session state.json."""
state_path = os.path.join(SESSIONS_PATH, session_id, "state.json")
state["updated"] = datetime.now(timezone.utc).isoformat()
with open(state_path, "w") as f:
json.dump(state, f, indent=2)
f.write("\n")
def load_config(session_id):
"""Load session config.yaml (simple key: value parsing)."""
config_path = os.path.join(SESSIONS_PATH, session_id, "config.yaml")
if not os.path.exists(config_path):
return None
config = {}
with open(config_path) as f:
for line in f:
line = line.strip()
if ":" in line and not line.startswith("#"):
key, val = line.split(":", 1)
config[key.strip()] = val.strip().strip('"')
return config
def run_git(*args):
"""Run a git command and return stdout."""
try:
result = subprocess.run(
["git"] + list(args),
capture_output=True, text=True, check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError:
return ""
def list_sessions(output_format="text"):
"""List all sessions with their states."""
if not os.path.isdir(SESSIONS_PATH):
print("No sessions found. Run hub_init.py first.")
return
sessions = []
for sid in sorted(os.listdir(SESSIONS_PATH)):
session_dir = os.path.join(SESSIONS_PATH, sid)
if not os.path.isdir(session_dir):
continue
state = load_state(sid)
config = load_config(sid)
if state and config:
sessions.append({
"session_id": sid,
"state": state.get("state", "unknown"),
"task": config.get("task", ""),
"agents": config.get("agent_count", "?"),
"created": state.get("created", ""),
})
if output_format == "json":
print(json.dumps({"sessions": sessions}, indent=2))
return
if not sessions:
print("No sessions found.")
return
print("AgentHub Sessions")
print()
header = f"{'SESSION ID':<20} {'STATE':<12} {'AGENTS':<8} {'TASK'}"
print(header)
print("-" * 70)
for s in sessions:
task = s["task"][:40] + "..." if len(s["task"]) > 40 else s["task"]
print(f"{s['session_id']:<20} {s['state']:<12} {s['agents']:<8} {task}")
def show_status(session_id, output_format="text"):
"""Show detailed status for a session."""
state = load_state(session_id)
config = load_config(session_id)
if not state or not config:
print(f"Error: Session {session_id} not found", file=sys.stderr)
sys.exit(1)
if output_format == "json":
print(json.dumps({"config": config, "state": state}, indent=2))
return
print(f"Session: {session_id}")
print(f" State: {state.get('state', 'unknown')}")
print(f" Task: {config.get('task', '')}")
print(f" Agents: {config.get('agent_count', '?')}")
print(f" Base branch: {config.get('base_branch', '?')}")
if config.get("eval_cmd"):
print(f" Eval: {config['eval_cmd']}")
if config.get("metric"):
print(f" Metric: {config['metric']} ({config.get('direction', '?')})")
print(f" Created: {state.get('created', '?')}")
print(f" Updated: {state.get('updated', '?')}")
# Show agent branches
branches = run_git("branch", "--list", f"hub/{session_id}/*",
"--format=%(refname:short)")
if branches:
print()
print(" Branches:")
for b in branches.split("\n"):
if b.strip():
print(f" {b.strip()}")
def update_state(session_id, new_state):
"""Transition session to a new state."""
state = load_state(session_id)
if not state:
print(f"Error: Session {session_id} not found", file=sys.stderr)
sys.exit(1)
current = state.get("state", "unknown")
if new_state not in VALID_STATES:
print(f"Error: Invalid state '{new_state}'. "
f"Valid: {', '.join(VALID_STATES)}", file=sys.stderr)
sys.exit(1)
valid_next = VALID_TRANSITIONS.get(current, [])
if new_state not in valid_next:
print(f"Error: Cannot transition from '{current}' to '{new_state}'. "
f"Valid transitions: {', '.join(valid_next) or 'none (terminal)'}",
file=sys.stderr)
sys.exit(1)
state["state"] = new_state
save_state(session_id, state)
print(f"Session {session_id}: {current}{new_state}")
def cleanup_session(session_id):
"""Clean up worktrees and optionally archive branches."""
config = load_config(session_id)
if not config:
print(f"Error: Session {session_id} not found", file=sys.stderr)
sys.exit(1)
# Find and remove worktrees for this session
worktree_output = run_git("worktree", "list", "--porcelain")
removed = 0
if worktree_output:
current_path = None
for line in worktree_output.split("\n"):
if line.startswith("worktree "):
current_path = line[len("worktree "):]
elif line.startswith("branch ") and current_path:
ref = line[len("branch "):]
if f"hub/{session_id}/" in ref:
result = subprocess.run(
["git", "worktree", "remove", "--force", current_path],
capture_output=True, text=True
)
if result.returncode == 0:
removed += 1
print(f" Removed worktree: {current_path}")
current_path = None
print(f"Cleaned up {removed} worktrees for session {session_id}")
def run_demo():
"""Show demo output."""
print("=" * 60)
print("AgentHub Session Manager — Demo Mode")
print("=" * 60)
print()
print("--- Session List ---")
print("AgentHub Sessions")
print()
header = f"{'SESSION ID':<20} {'STATE':<12} {'AGENTS':<8} {'TASK'}"
print(header)
print("-" * 70)
print(f"{'20260317-143022':<20} {'merged':<12} {'3':<8} Optimize API response time below 100ms")
print(f"{'20260317-151500':<20} {'running':<12} {'2':<8} Refactor auth module for JWT support")
print(f"{'20260317-160000':<20} {'init':<12} {'4':<8} Implement caching strategy")
print()
print("--- Session Detail ---")
print("Session: 20260317-143022")
print(" State: merged")
print(" Task: Optimize API response time below 100ms")
print(" Agents: 3")
print(" Base branch: dev")
print(" Eval: pytest bench.py --json")
print(" Metric: p50_ms (lower)")
print(" Created: 2026-03-17T14:30:22Z")
print(" Updated: 2026-03-17T14:45:00Z")
print()
print(" Branches:")
print(" hub/20260317-143022/agent-1/attempt-1 (archived)")
print(" hub/20260317-143022/agent-2/attempt-1 (merged)")
print(" hub/20260317-143022/agent-3/attempt-1 (archived)")
print()
print("--- State Transitions ---")
print("Valid transitions:")
for state, transitions in VALID_TRANSITIONS.items():
arrow = "".join(transitions) if transitions else "(terminal)"
print(f" {state}: {arrow}")
def main():
parser = argparse.ArgumentParser(
description="AgentHub session state machine and lifecycle manager"
)
parser.add_argument("--list", action="store_true",
help="List all sessions with state")
parser.add_argument("--status", type=str, metavar="SESSION_ID",
help="Show detailed session status")
parser.add_argument("--update", type=str, metavar="SESSION_ID",
help="Update session state")
parser.add_argument("--state", type=str,
help="New state for --update")
parser.add_argument("--cleanup", type=str, metavar="SESSION_ID",
help="Remove worktrees and clean up session")
parser.add_argument("--format", choices=["text", "json"], default="text",
help="Output format (default: text)")
parser.add_argument("--demo", action="store_true",
help="Show demo output")
args = parser.parse_args()
if args.demo:
run_demo()
return
if args.list:
list_sessions(args.format)
return
if args.status:
show_status(args.status, args.format)
return
if args.update:
if not args.state:
print("Error: --update requires --state", file=sys.stderr)
sys.exit(1)
update_state(args.update, args.state)
return
if args.cleanup:
cleanup_session(args.cleanup)
return
parser.print_help()
if __name__ == "__main__":
main()