perf: Optimize LOCAL mode AI enhancement with parallel execution
- Increase default batch size from 5 to 20 patterns per CLI call - Add parallel execution with 3 concurrent workers (configurable) - Add ai_enhancement settings to config_manager: - local_batch_size: patterns per Claude CLI call (default: 20) - local_parallel_workers: concurrent CLI calls (default: 3) - Expected speedup: 6-12x faster for large codebases Config settings can be changed via: skill-seekers config (coming soon) or editing ~/.config/skill-seekers/config.json Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -27,11 +27,19 @@ import logging
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Import config manager for settings
|
||||
try:
|
||||
from skill_seekers.cli.config_manager import get_config_manager
|
||||
CONFIG_AVAILABLE = True
|
||||
except ImportError:
|
||||
CONFIG_AVAILABLE = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class AIAnalysis:
|
||||
@@ -65,6 +73,15 @@ class AIEnhancer:
|
||||
self.api_key = api_key or os.environ.get("ANTHROPIC_API_KEY")
|
||||
self.client = None
|
||||
|
||||
# Get settings from config (with defaults)
|
||||
if CONFIG_AVAILABLE:
|
||||
config = get_config_manager()
|
||||
self.local_batch_size = config.get_local_batch_size()
|
||||
self.local_parallel_workers = config.get_local_parallel_workers()
|
||||
else:
|
||||
self.local_batch_size = 20 # Default
|
||||
self.local_parallel_workers = 3 # Default
|
||||
|
||||
# Determine actual mode
|
||||
if mode == "auto":
|
||||
if self.api_key:
|
||||
@@ -232,20 +249,68 @@ class PatternEnhancer(AIEnhancer):
|
||||
if not self.enabled or not patterns:
|
||||
return patterns
|
||||
|
||||
logger.info(f"🤖 Enhancing {len(patterns)} detected patterns with AI...")
|
||||
|
||||
# Batch patterns to minimize API calls (max 5 per batch)
|
||||
batch_size = 5
|
||||
enhanced = []
|
||||
# Use larger batch size for LOCAL mode (configurable)
|
||||
if self.mode == "local":
|
||||
batch_size = self.local_batch_size
|
||||
parallel_workers = self.local_parallel_workers
|
||||
logger.info(
|
||||
f"🤖 Enhancing {len(patterns)} patterns with AI "
|
||||
f"(LOCAL mode: {batch_size} per batch, {parallel_workers} parallel workers)..."
|
||||
)
|
||||
else:
|
||||
batch_size = 5 # API mode uses smaller batches
|
||||
parallel_workers = 1 # API mode is sequential
|
||||
logger.info(f"🤖 Enhancing {len(patterns)} detected patterns with AI...")
|
||||
|
||||
# Create batches
|
||||
batches = []
|
||||
for i in range(0, len(patterns), batch_size):
|
||||
batch = patterns[i : i + batch_size]
|
||||
batch_results = self._enhance_pattern_batch(batch)
|
||||
enhanced.extend(batch_results)
|
||||
batches.append(patterns[i : i + batch_size])
|
||||
|
||||
# Process batches (parallel for LOCAL, sequential for API)
|
||||
if parallel_workers > 1 and len(batches) > 1:
|
||||
enhanced = self._enhance_patterns_parallel(batches, parallel_workers)
|
||||
else:
|
||||
enhanced = []
|
||||
for batch in batches:
|
||||
batch_results = self._enhance_pattern_batch(batch)
|
||||
enhanced.extend(batch_results)
|
||||
|
||||
logger.info(f"✅ Enhanced {len(enhanced)} patterns")
|
||||
return enhanced
|
||||
|
||||
def _enhance_patterns_parallel(self, batches: list[list[dict]], workers: int) -> list[dict]:
|
||||
"""Process pattern batches in parallel using ThreadPoolExecutor."""
|
||||
results = [None] * len(batches) # Preserve order
|
||||
|
||||
with ThreadPoolExecutor(max_workers=workers) as executor:
|
||||
# Submit all batches
|
||||
future_to_idx = {
|
||||
executor.submit(self._enhance_pattern_batch, batch): idx
|
||||
for idx, batch in enumerate(batches)
|
||||
}
|
||||
|
||||
# Collect results as they complete
|
||||
completed = 0
|
||||
total = len(batches)
|
||||
for future in as_completed(future_to_idx):
|
||||
idx = future_to_idx[future]
|
||||
try:
|
||||
results[idx] = future.result()
|
||||
completed += 1
|
||||
if completed % 5 == 0 or completed == total:
|
||||
logger.info(f" Progress: {completed}/{total} batches completed")
|
||||
except Exception as e:
|
||||
logger.warning(f"⚠️ Batch {idx} failed: {e}")
|
||||
results[idx] = batches[idx] # Return unenhanced on failure
|
||||
|
||||
# Flatten results
|
||||
enhanced = []
|
||||
for batch_result in results:
|
||||
if batch_result:
|
||||
enhanced.extend(batch_result)
|
||||
return enhanced
|
||||
|
||||
def _enhance_pattern_batch(self, patterns: list[dict]) -> list[dict]:
|
||||
"""Enhance a batch of patterns"""
|
||||
# Prepare prompt
|
||||
@@ -321,20 +386,68 @@ class TestExampleEnhancer(AIEnhancer):
|
||||
if not self.enabled or not examples:
|
||||
return examples
|
||||
|
||||
logger.info(f"🤖 Enhancing {len(examples)} test examples with AI...")
|
||||
|
||||
# Batch examples to minimize API calls
|
||||
batch_size = 5
|
||||
enhanced = []
|
||||
# Use larger batch size for LOCAL mode (configurable)
|
||||
if self.mode == "local":
|
||||
batch_size = self.local_batch_size
|
||||
parallel_workers = self.local_parallel_workers
|
||||
logger.info(
|
||||
f"🤖 Enhancing {len(examples)} test examples with AI "
|
||||
f"(LOCAL mode: {batch_size} per batch, {parallel_workers} parallel workers)..."
|
||||
)
|
||||
else:
|
||||
batch_size = 5 # API mode uses smaller batches
|
||||
parallel_workers = 1 # API mode is sequential
|
||||
logger.info(f"🤖 Enhancing {len(examples)} test examples with AI...")
|
||||
|
||||
# Create batches
|
||||
batches = []
|
||||
for i in range(0, len(examples), batch_size):
|
||||
batch = examples[i : i + batch_size]
|
||||
batch_results = self._enhance_example_batch(batch)
|
||||
enhanced.extend(batch_results)
|
||||
batches.append(examples[i : i + batch_size])
|
||||
|
||||
# Process batches (parallel for LOCAL, sequential for API)
|
||||
if parallel_workers > 1 and len(batches) > 1:
|
||||
enhanced = self._enhance_examples_parallel(batches, parallel_workers)
|
||||
else:
|
||||
enhanced = []
|
||||
for batch in batches:
|
||||
batch_results = self._enhance_example_batch(batch)
|
||||
enhanced.extend(batch_results)
|
||||
|
||||
logger.info(f"✅ Enhanced {len(enhanced)} examples")
|
||||
return enhanced
|
||||
|
||||
def _enhance_examples_parallel(self, batches: list[list[dict]], workers: int) -> list[dict]:
|
||||
"""Process example batches in parallel using ThreadPoolExecutor."""
|
||||
results = [None] * len(batches) # Preserve order
|
||||
|
||||
with ThreadPoolExecutor(max_workers=workers) as executor:
|
||||
# Submit all batches
|
||||
future_to_idx = {
|
||||
executor.submit(self._enhance_example_batch, batch): idx
|
||||
for idx, batch in enumerate(batches)
|
||||
}
|
||||
|
||||
# Collect results as they complete
|
||||
completed = 0
|
||||
total = len(batches)
|
||||
for future in as_completed(future_to_idx):
|
||||
idx = future_to_idx[future]
|
||||
try:
|
||||
results[idx] = future.result()
|
||||
completed += 1
|
||||
if completed % 5 == 0 or completed == total:
|
||||
logger.info(f" Progress: {completed}/{total} batches completed")
|
||||
except Exception as e:
|
||||
logger.warning(f"⚠️ Batch {idx} failed: {e}")
|
||||
results[idx] = batches[idx] # Return unenhanced on failure
|
||||
|
||||
# Flatten results
|
||||
enhanced = []
|
||||
for batch_result in results:
|
||||
if batch_result:
|
||||
enhanced.extend(batch_result)
|
||||
return enhanced
|
||||
|
||||
def _enhance_example_batch(self, examples: list[dict]) -> list[dict]:
|
||||
"""Enhance a batch of examples"""
|
||||
# Prepare prompt
|
||||
|
||||
@@ -34,6 +34,10 @@ class ConfigManager:
|
||||
},
|
||||
"resume": {"auto_save_interval_seconds": 60, "keep_progress_days": 7},
|
||||
"api_keys": {"anthropic": None, "google": None, "openai": None},
|
||||
"ai_enhancement": {
|
||||
"local_batch_size": 20, # Patterns per Claude CLI call (default was 5)
|
||||
"local_parallel_workers": 3, # Concurrent Claude CLI calls
|
||||
},
|
||||
"first_run": {"completed": False, "version": "2.7.0"},
|
||||
}
|
||||
|
||||
@@ -378,6 +382,30 @@ class ConfigManager:
|
||||
if deleted_count > 0:
|
||||
print(f"🧹 Cleaned up {deleted_count} old progress file(s)")
|
||||
|
||||
# AI Enhancement Settings
|
||||
|
||||
def get_local_batch_size(self) -> int:
|
||||
"""Get batch size for LOCAL mode AI enhancement."""
|
||||
return self.config.get("ai_enhancement", {}).get("local_batch_size", 20)
|
||||
|
||||
def set_local_batch_size(self, size: int):
|
||||
"""Set batch size for LOCAL mode AI enhancement."""
|
||||
if "ai_enhancement" not in self.config:
|
||||
self.config["ai_enhancement"] = {}
|
||||
self.config["ai_enhancement"]["local_batch_size"] = size
|
||||
self.save_config()
|
||||
|
||||
def get_local_parallel_workers(self) -> int:
|
||||
"""Get number of parallel workers for LOCAL mode AI enhancement."""
|
||||
return self.config.get("ai_enhancement", {}).get("local_parallel_workers", 3)
|
||||
|
||||
def set_local_parallel_workers(self, workers: int):
|
||||
"""Set number of parallel workers for LOCAL mode AI enhancement."""
|
||||
if "ai_enhancement" not in self.config:
|
||||
self.config["ai_enhancement"] = {}
|
||||
self.config["ai_enhancement"]["local_parallel_workers"] = workers
|
||||
self.save_config()
|
||||
|
||||
# First Run Experience
|
||||
|
||||
def is_first_run(self) -> bool:
|
||||
@@ -443,6 +471,11 @@ class ConfigManager:
|
||||
print(f" • Auto-switch profiles: {self.config['rate_limit']['auto_switch_profiles']}")
|
||||
print(f" • Keep progress for: {self.config['resume']['keep_progress_days']} days")
|
||||
|
||||
# AI Enhancement settings
|
||||
print("\nAI Enhancement (LOCAL mode):")
|
||||
print(f" • Batch size: {self.get_local_batch_size()} patterns per call")
|
||||
print(f" • Parallel workers: {self.get_local_parallel_workers()} concurrent calls")
|
||||
|
||||
# Resumable jobs
|
||||
jobs = self.list_resumable_jobs()
|
||||
if jobs:
|
||||
|
||||
Reference in New Issue
Block a user