feat: add enhancement workflow system and unified enhancer

- enhancement_workflow.py: WorkflowEngine class for multi-stage AI
  enhancement workflows with preset support (security-focus,
  architecture-comprehensive, api-documentation, minimal, default)
- unified_enhancer.py: unified enhancement orchestrator integrating
  workflow execution with traditional enhance-level based enhancement
- create_command.py: wire workflow args into the unified create command
- AGENTS.md: update agent capability documentation
- configs/godot_unified.json: add unified Godot documentation config
- ENHANCEMENT_WORKFLOW_SYSTEM.md: documentation for the workflow system
- WORKFLOW_ENHANCEMENT_SEQUENTIAL_EXECUTION.md: docs explaining
  sequential execution of workflows followed by AI enhancement

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
yusyus
2026-02-17 22:14:19 +03:00
parent 60c46673ed
commit a9b51ab3fe
7 changed files with 2016 additions and 7 deletions

View File

@@ -0,0 +1,532 @@
#!/usr/bin/env python3
"""
Enhancement Workflow Engine
Allows users to define custom AI enhancement workflows with:
- Sequential stages that build on previous results
- Custom prompts per stage
- History passing between stages
- Post-processing configuration
- Per-project and global workflow support
Usage:
# Use global workflow
skill-seekers analyze . --enhance-workflow security-focus
# Use project workflow
skill-seekers analyze . --enhance-workflow .skill-seekers/enhancement.yaml
# Quick inline stages
skill-seekers analyze . \\
--enhance-stage "security:Analyze for security issues" \\
--enhance-stage "cleanup:Remove boilerplate"
"""
import json
import logging
import os
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Literal
import yaml
logger = logging.getLogger(__name__)
@dataclass
class WorkflowStage:
"""Single enhancement stage in a workflow."""
name: str
type: Literal["builtin", "custom"]
target: str # "patterns", "examples", "config", "skill_md", "all"
prompt: str | None = None
uses_history: bool = False
enabled: bool = True
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class PostProcessConfig:
"""Post-processing configuration."""
remove_sections: list[str] = field(default_factory=list)
reorder_sections: list[str] = field(default_factory=list)
add_metadata: dict[str, Any] = field(default_factory=dict)
custom_transforms: list[dict[str, Any]] = field(default_factory=list)
@dataclass
class EnhancementWorkflow:
"""Complete enhancement workflow definition."""
name: str
description: str
version: str = "1.0"
applies_to: list[str] = field(default_factory=lambda: ["codebase_analysis"])
variables: dict[str, Any] = field(default_factory=dict)
stages: list[WorkflowStage] = field(default_factory=list)
post_process: PostProcessConfig = field(default_factory=PostProcessConfig)
extends: str | None = None # Inherit from another workflow
class WorkflowEngine:
"""
Execute enhancement workflows with sequential stages.
Each stage can:
- Access previous stage results
- Access all history
- Access specific stages by name
- Run custom AI prompts
- Target specific parts of the analysis
"""
def __init__(self, workflow: EnhancementWorkflow | str | Path):
"""
Initialize workflow engine.
Args:
workflow: EnhancementWorkflow object or path to YAML file
"""
if isinstance(workflow, (str, Path)):
self.workflow = self._load_workflow(workflow)
else:
self.workflow = workflow
self.history: list[dict[str, Any]] = []
self.enhancer = None # Lazy load UnifiedEnhancer
def _load_workflow(self, workflow_path: str | Path) -> EnhancementWorkflow:
"""Load workflow from YAML file."""
workflow_path = Path(workflow_path)
# Resolve path (support both absolute and relative)
if not workflow_path.is_absolute():
# Try relative to CWD first
if not workflow_path.exists():
# Try in config directory
config_dir = Path.home() / ".config" / "skill-seekers" / "workflows"
workflow_path = config_dir / workflow_path
if not workflow_path.exists():
raise FileNotFoundError(f"Workflow not found: {workflow_path}")
logger.info(f"📋 Loading workflow: {workflow_path}")
with open(workflow_path, encoding="utf-8") as f:
data = yaml.safe_load(f)
# Handle inheritance (extends)
if "extends" in data and data["extends"]:
parent = self._load_workflow(data["extends"])
data = self._merge_workflows(parent, data)
# Parse stages
stages = []
for stage_data in data.get("stages", []):
stages.append(
WorkflowStage(
name=stage_data["name"],
type=stage_data.get("type", "custom"),
target=stage_data.get("target", "all"),
prompt=stage_data.get("prompt"),
uses_history=stage_data.get("uses_history", False),
enabled=stage_data.get("enabled", True),
metadata=stage_data.get("metadata", {}),
)
)
# Parse post-processing
post_process_data = data.get("post_process", {})
post_process = PostProcessConfig(
remove_sections=post_process_data.get("remove_sections", []),
reorder_sections=post_process_data.get("reorder_sections", []),
add_metadata=post_process_data.get("add_metadata", {}),
custom_transforms=post_process_data.get("custom_transforms", []),
)
return EnhancementWorkflow(
name=data.get("name", "Unnamed Workflow"),
description=data.get("description", ""),
version=data.get("version", "1.0"),
applies_to=data.get("applies_to", ["codebase_analysis"]),
variables=data.get("variables", {}),
stages=stages,
post_process=post_process,
extends=data.get("extends"),
)
def _merge_workflows(
self, parent: EnhancementWorkflow, child_data: dict
) -> dict:
"""Merge child workflow with parent (inheritance)."""
# Start with parent as dict
merged = {
"name": child_data.get("name", parent.name),
"description": child_data.get("description", parent.description),
"version": child_data.get("version", parent.version),
"applies_to": child_data.get("applies_to", parent.applies_to),
"variables": {**parent.variables, **child_data.get("variables", {})},
"stages": [],
"post_process": {},
}
# Merge stages (child can override by name)
parent_stages = {s.name: s for s in parent.stages}
child_stages = {s["name"]: s for s in child_data.get("stages", [])}
for name in list(parent_stages.keys()) + list(child_stages.keys()):
if name in child_stages:
# Child overrides parent
stage_dict = child_stages[name]
else:
# Use parent stage
stage = parent_stages[name]
stage_dict = {
"name": stage.name,
"type": stage.type,
"target": stage.target,
"prompt": stage.prompt,
"uses_history": stage.uses_history,
"enabled": stage.enabled,
}
if stage_dict not in merged["stages"]:
merged["stages"].append(stage_dict)
# Merge post-processing
parent_post = parent.post_process
child_post = child_data.get("post_process", {})
merged["post_process"] = {
"remove_sections": child_post.get(
"remove_sections", parent_post.remove_sections
),
"reorder_sections": child_post.get(
"reorder_sections", parent_post.reorder_sections
),
"add_metadata": {
**parent_post.add_metadata,
**child_post.get("add_metadata", {}),
},
"custom_transforms": parent_post.custom_transforms
+ child_post.get("custom_transforms", []),
}
return merged
def run(self, analysis_results: dict, context: dict | None = None) -> dict:
"""
Run workflow stages sequentially.
Args:
analysis_results: Results from analysis (patterns, examples, etc.)
context: Additional context variables
Returns:
Enhanced results after all stages
"""
logger.info(f"🚀 Starting workflow: {self.workflow.name}")
logger.info(f" Description: {self.workflow.description}")
logger.info(f" Stages: {len(self.workflow.stages)}")
current_results = analysis_results
context = context or {}
# Merge workflow variables into context
context.update(self.workflow.variables)
# Run each stage
for idx, stage in enumerate(self.workflow.stages, 1):
if not stage.enabled:
logger.info(f"⏭️ Skipping disabled stage: {stage.name}")
continue
logger.info(f"🔄 Running stage {idx}/{len(self.workflow.stages)}: {stage.name}")
# Build stage context
stage_context = self._build_stage_context(
stage, current_results, context
)
# Run stage
try:
stage_results = self._run_stage(stage, stage_context)
# Save to history
self.history.append(
{
"stage": stage.name,
"results": stage_results,
"timestamp": datetime.now().isoformat(),
"metadata": stage.metadata,
}
)
# Merge stage results into current results
current_results = self._merge_stage_results(
current_results, stage_results, stage.target
)
logger.info(f" ✅ Stage complete: {stage.name}")
except Exception as e:
logger.error(f" ❌ Stage failed: {stage.name} - {e}")
# Continue with next stage (optional: make this configurable)
continue
# Post-processing
logger.info("🔧 Running post-processing...")
final_results = self._post_process(current_results)
logger.info(f"✅ Workflow complete: {self.workflow.name}")
return final_results
def _build_stage_context(
self, stage: WorkflowStage, current_results: dict, base_context: dict
) -> dict:
"""Build context for a stage (includes history if needed)."""
context = {
"current_results": current_results,
**base_context,
}
if stage.uses_history and self.history:
# Add previous stage
context["previous_results"] = self.history[-1]["results"]
# Add all history
context["all_history"] = self.history
# Add stages by name for easy access
context["stages"] = {h["stage"]: h["results"] for h in self.history}
return context
def _run_stage(self, stage: WorkflowStage, context: dict) -> dict:
"""Run a single stage."""
if stage.type == "builtin":
return self._run_builtin_stage(stage, context)
else:
return self._run_custom_stage(stage, context)
def _run_builtin_stage(self, stage: WorkflowStage, context: dict) -> dict:
"""Run built-in enhancement stage."""
# Use existing enhancement system
from skill_seekers.cli.ai_enhancer import PatternEnhancer, TestExampleEnhancer
current = context["current_results"]
# Determine what to enhance based on target
if stage.target == "patterns" and "patterns" in current:
enhancer = PatternEnhancer()
enhanced_patterns = enhancer.enhance_patterns(current["patterns"])
return {"patterns": enhanced_patterns}
elif stage.target == "examples" and "examples" in current:
enhancer = TestExampleEnhancer()
enhanced_examples = enhancer.enhance_examples(current["examples"])
return {"examples": enhanced_examples}
else:
logger.warning(f"Unknown builtin target: {stage.target}")
return {}
def _run_custom_stage(self, stage: WorkflowStage, context: dict) -> dict:
"""Run custom AI enhancement stage."""
if not stage.prompt:
logger.warning(f"Custom stage '{stage.name}' has no prompt")
return {}
# Lazy load enhancer
if not self.enhancer:
from skill_seekers.cli.ai_enhancer import AIEnhancer
self.enhancer = AIEnhancer()
# Format prompt with context
try:
formatted_prompt = stage.prompt.format(**context)
except KeyError as e:
logger.warning(f"Missing context variable: {e}")
formatted_prompt = stage.prompt
# Call AI with custom prompt
logger.info(f" 🤖 Running custom AI prompt...")
response = self.enhancer._call_claude(formatted_prompt, max_tokens=3000)
if not response:
logger.warning(f" ⚠️ No response from AI")
return {}
# Try to parse as JSON first, fallback to plain text
try:
result = json.loads(response)
except json.JSONDecodeError:
# Plain text response
result = {"content": response, "stage": stage.name}
return result
def _merge_stage_results(
self, current: dict, stage_results: dict, target: str
) -> dict:
"""Merge stage results into current results."""
if target == "all":
# Merge everything
return {**current, **stage_results}
else:
# Merge only specific target
current[target] = stage_results.get(target, stage_results)
return current
def _post_process(self, results: dict) -> dict:
"""Apply post-processing configuration."""
config = self.workflow.post_process
# Remove sections
for section in config.remove_sections:
if section in results:
logger.info(f" 🗑️ Removing section: {section}")
del results[section]
# Add metadata
if config.add_metadata:
if "metadata" not in results:
results["metadata"] = {}
results["metadata"].update(config.add_metadata)
logger.info(f" 📝 Added metadata: {list(config.add_metadata.keys())}")
# Reorder sections (for SKILL.md generation)
if config.reorder_sections and "skill_md_sections" in results:
logger.info(f" 🔄 Reordering sections...")
# This will be used during SKILL.md generation
results["section_order"] = config.reorder_sections
# Custom transforms (extensibility)
for transform in config.custom_transforms:
logger.info(f" ⚙️ Applying transform: {transform.get('name', 'unknown')}")
# TODO: Implement custom transform system
return results
def save_history(self, output_path: Path):
"""Save workflow execution history."""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
history_data = {
"workflow": self.workflow.name,
"version": self.workflow.version,
"executed_at": datetime.now().isoformat(),
"stages": self.history,
}
with open(output_path, "w", encoding="utf-8") as f:
json.dump(history_data, f, indent=2)
logger.info(f"💾 Saved workflow history: {output_path}")
def create_default_workflows():
"""Create default workflow templates in user config directory."""
config_dir = Path.home() / ".config" / "skill-seekers" / "workflows"
config_dir.mkdir(parents=True, exist_ok=True)
# Default workflow
default_workflow = {
"name": "Default Enhancement",
"description": "Standard AI enhancement with all features",
"version": "1.0",
"applies_to": ["codebase_analysis", "doc_scraping", "github_analysis"],
"stages": [
{
"name": "base_analysis",
"type": "builtin",
"target": "patterns",
"enabled": True,
},
{
"name": "test_examples",
"type": "builtin",
"target": "examples",
"enabled": True,
},
],
"post_process": {
"add_metadata": {"enhanced": True, "workflow": "default"}
},
}
# Security-focused workflow
security_workflow = {
"name": "Security-Focused Analysis",
"description": "Emphasize security patterns and vulnerabilities",
"version": "1.0",
"applies_to": ["codebase_analysis"],
"variables": {"focus_area": "security"},
"stages": [
{
"name": "base_patterns",
"type": "builtin",
"target": "patterns",
},
{
"name": "security_analysis",
"type": "custom",
"target": "security",
"uses_history": True,
"prompt": """Based on the patterns detected: {previous_results}
Perform deep security analysis:
1. **Authentication/Authorization**:
- Auth bypass risks?
- Token handling secure?
- Session management issues?
2. **Input Validation**:
- User input sanitized?
- SQL injection risks?
- XSS vulnerabilities?
3. **Data Exposure**:
- Sensitive data in logs?
- Secrets in config?
- PII handling?
4. **Cryptography**:
- Weak algorithms?
- Hardcoded keys?
- Insecure RNG?
Output as JSON with 'findings' array.""",
},
],
"post_process": {
"add_metadata": {"security_reviewed": True},
},
}
# Save workflows
workflows = {
"default.yaml": default_workflow,
"security-focus.yaml": security_workflow,
}
for filename, workflow_data in workflows.items():
workflow_file = config_dir / filename
if not workflow_file.exists():
with open(workflow_file, "w", encoding="utf-8") as f:
yaml.dump(workflow_data, f, default_flow_style=False, sort_keys=False)
logger.info(f"✅ Created workflow: {workflow_file}")
return config_dir
if __name__ == "__main__":
# Create default workflows
create_default_workflows()
print("✅ Default workflows created!")