fix(skill): restructure tech-stack-evaluator with Progressive Disclosure (#64) (#120)

Restructure skill to follow Progressive Disclosure Architecture:

Structure Changes:
- Move Python scripts to scripts/ directory
- Move sample JSON files to assets/ directory
- Create references/ directory with extracted content
- Remove redundant HOW_TO_USE.md and README.md

New Reference Files:
- references/metrics.md: Detailed scoring algorithms and formulas
- references/examples.md: Concrete input/output examples
- references/workflows.md: Step-by-step evaluation workflows

SKILL.md Improvements:
- Reduced from 430 lines to ~180 lines
- Added table of contents
- Added trigger phrases in description
- Consistent imperative voice
- Points to references for details

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alireza Rezvani
2026-01-30 06:28:42 +01:00
committed by GitHub
parent 829a197c2b
commit a10a4f2c4b
17 changed files with 1114 additions and 1266 deletions

View File

@@ -0,0 +1,501 @@
"""
Ecosystem Health Analyzer.
Analyzes technology ecosystem health including community size, maintenance status,
GitHub metrics, npm downloads, and long-term viability assessment.
"""
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
class EcosystemAnalyzer:
"""Analyze technology ecosystem health and viability."""
def __init__(self, ecosystem_data: Dict[str, Any]):
"""
Initialize analyzer with ecosystem data.
Args:
ecosystem_data: Dictionary containing GitHub, npm, and community metrics
"""
self.technology = ecosystem_data.get('technology', 'Unknown')
self.github_data = ecosystem_data.get('github', {})
self.npm_data = ecosystem_data.get('npm', {})
self.community_data = ecosystem_data.get('community', {})
self.corporate_backing = ecosystem_data.get('corporate_backing', {})
def calculate_health_score(self) -> Dict[str, float]:
"""
Calculate overall ecosystem health score (0-100).
Returns:
Dictionary of health score components
"""
scores = {
'github_health': self._score_github_health(),
'npm_health': self._score_npm_health(),
'community_health': self._score_community_health(),
'corporate_backing': self._score_corporate_backing(),
'maintenance_health': self._score_maintenance_health()
}
# Calculate weighted average
weights = {
'github_health': 0.25,
'npm_health': 0.20,
'community_health': 0.20,
'corporate_backing': 0.15,
'maintenance_health': 0.20
}
overall = sum(scores[k] * weights[k] for k in scores.keys())
scores['overall_health'] = overall
return scores
def _score_github_health(self) -> float:
"""
Score GitHub repository health.
Returns:
GitHub health score (0-100)
"""
score = 0.0
# Stars (0-30 points)
stars = self.github_data.get('stars', 0)
if stars >= 50000:
score += 30
elif stars >= 20000:
score += 25
elif stars >= 10000:
score += 20
elif stars >= 5000:
score += 15
elif stars >= 1000:
score += 10
else:
score += max(0, stars / 100) # 1 point per 100 stars
# Forks (0-20 points)
forks = self.github_data.get('forks', 0)
if forks >= 10000:
score += 20
elif forks >= 5000:
score += 15
elif forks >= 2000:
score += 12
elif forks >= 1000:
score += 10
else:
score += max(0, forks / 100)
# Contributors (0-20 points)
contributors = self.github_data.get('contributors', 0)
if contributors >= 500:
score += 20
elif contributors >= 200:
score += 15
elif contributors >= 100:
score += 12
elif contributors >= 50:
score += 10
else:
score += max(0, contributors / 5)
# Commit frequency (0-30 points)
commits_last_month = self.github_data.get('commits_last_month', 0)
if commits_last_month >= 100:
score += 30
elif commits_last_month >= 50:
score += 25
elif commits_last_month >= 25:
score += 20
elif commits_last_month >= 10:
score += 15
else:
score += max(0, commits_last_month * 1.5)
return min(100.0, score)
def _score_npm_health(self) -> float:
"""
Score npm package health (if applicable).
Returns:
npm health score (0-100)
"""
if not self.npm_data:
return 50.0 # Neutral score if not applicable
score = 0.0
# Weekly downloads (0-40 points)
weekly_downloads = self.npm_data.get('weekly_downloads', 0)
if weekly_downloads >= 1000000:
score += 40
elif weekly_downloads >= 500000:
score += 35
elif weekly_downloads >= 100000:
score += 30
elif weekly_downloads >= 50000:
score += 25
elif weekly_downloads >= 10000:
score += 20
else:
score += max(0, weekly_downloads / 500)
# Version stability (0-20 points)
version = self.npm_data.get('version', '0.0.1')
major_version = int(version.split('.')[0]) if version else 0
if major_version >= 5:
score += 20
elif major_version >= 3:
score += 15
elif major_version >= 1:
score += 10
else:
score += 5
# Dependencies count (0-20 points, fewer is better)
dependencies = self.npm_data.get('dependencies_count', 50)
if dependencies <= 10:
score += 20
elif dependencies <= 25:
score += 15
elif dependencies <= 50:
score += 10
else:
score += max(0, 20 - (dependencies - 50) / 10)
# Last publish date (0-20 points)
days_since_publish = self.npm_data.get('days_since_last_publish', 365)
if days_since_publish <= 30:
score += 20
elif days_since_publish <= 90:
score += 15
elif days_since_publish <= 180:
score += 10
elif days_since_publish <= 365:
score += 5
else:
score += 0
return min(100.0, score)
def _score_community_health(self) -> float:
"""
Score community health and engagement.
Returns:
Community health score (0-100)
"""
score = 0.0
# Stack Overflow questions (0-25 points)
so_questions = self.community_data.get('stackoverflow_questions', 0)
if so_questions >= 50000:
score += 25
elif so_questions >= 20000:
score += 20
elif so_questions >= 10000:
score += 15
elif so_questions >= 5000:
score += 10
else:
score += max(0, so_questions / 500)
# Job postings (0-25 points)
job_postings = self.community_data.get('job_postings', 0)
if job_postings >= 5000:
score += 25
elif job_postings >= 2000:
score += 20
elif job_postings >= 1000:
score += 15
elif job_postings >= 500:
score += 10
else:
score += max(0, job_postings / 50)
# Tutorials and resources (0-25 points)
tutorials = self.community_data.get('tutorials_count', 0)
if tutorials >= 1000:
score += 25
elif tutorials >= 500:
score += 20
elif tutorials >= 200:
score += 15
elif tutorials >= 100:
score += 10
else:
score += max(0, tutorials / 10)
# Active forums/Discord (0-25 points)
forum_members = self.community_data.get('forum_members', 0)
if forum_members >= 50000:
score += 25
elif forum_members >= 20000:
score += 20
elif forum_members >= 10000:
score += 15
elif forum_members >= 5000:
score += 10
else:
score += max(0, forum_members / 500)
return min(100.0, score)
def _score_corporate_backing(self) -> float:
"""
Score corporate backing strength.
Returns:
Corporate backing score (0-100)
"""
backing_type = self.corporate_backing.get('type', 'none')
scores = {
'major_tech_company': 100, # Google, Microsoft, Meta, etc.
'established_company': 80, # Dedicated company (Vercel, HashiCorp)
'startup_backed': 60, # Funded startup
'community_led': 40, # Strong community, no corporate backing
'none': 20 # Individual maintainers
}
base_score = scores.get(backing_type, 40)
# Adjust for funding
funding = self.corporate_backing.get('funding_millions', 0)
if funding >= 100:
base_score = min(100, base_score + 20)
elif funding >= 50:
base_score = min(100, base_score + 10)
elif funding >= 10:
base_score = min(100, base_score + 5)
return base_score
def _score_maintenance_health(self) -> float:
"""
Score maintenance activity and responsiveness.
Returns:
Maintenance health score (0-100)
"""
score = 0.0
# Issue response time (0-30 points)
avg_response_hours = self.github_data.get('avg_issue_response_hours', 168) # 7 days default
if avg_response_hours <= 24:
score += 30
elif avg_response_hours <= 48:
score += 25
elif avg_response_hours <= 168: # 1 week
score += 20
elif avg_response_hours <= 336: # 2 weeks
score += 10
else:
score += 5
# Issue resolution rate (0-30 points)
resolution_rate = self.github_data.get('issue_resolution_rate', 0.5)
score += resolution_rate * 30
# Release frequency (0-20 points)
releases_per_year = self.github_data.get('releases_per_year', 4)
if releases_per_year >= 12:
score += 20
elif releases_per_year >= 6:
score += 15
elif releases_per_year >= 4:
score += 10
elif releases_per_year >= 2:
score += 5
else:
score += 0
# Active maintainers (0-20 points)
active_maintainers = self.github_data.get('active_maintainers', 1)
if active_maintainers >= 10:
score += 20
elif active_maintainers >= 5:
score += 15
elif active_maintainers >= 3:
score += 10
elif active_maintainers >= 1:
score += 5
else:
score += 0
return min(100.0, score)
def assess_viability(self) -> Dict[str, Any]:
"""
Assess long-term viability of technology.
Returns:
Viability assessment with risk factors
"""
health = self.calculate_health_score()
overall_health = health['overall_health']
# Determine viability level
if overall_health >= 80:
viability = "Excellent - Strong long-term viability"
risk_level = "Low"
elif overall_health >= 65:
viability = "Good - Solid viability with minor concerns"
risk_level = "Low-Medium"
elif overall_health >= 50:
viability = "Moderate - Viable but with notable risks"
risk_level = "Medium"
elif overall_health >= 35:
viability = "Concerning - Significant viability risks"
risk_level = "Medium-High"
else:
viability = "Poor - High risk of abandonment"
risk_level = "High"
# Identify specific risks
risks = self._identify_viability_risks(health)
# Identify strengths
strengths = self._identify_viability_strengths(health)
return {
'overall_viability': viability,
'risk_level': risk_level,
'health_score': overall_health,
'risks': risks,
'strengths': strengths,
'recommendation': self._generate_viability_recommendation(overall_health, risks)
}
def _identify_viability_risks(self, health: Dict[str, float]) -> List[str]:
"""
Identify viability risks from health scores.
Args:
health: Health score components
Returns:
List of identified risks
"""
risks = []
if health['maintenance_health'] < 50:
risks.append("Low maintenance activity - slow issue resolution")
if health['github_health'] < 50:
risks.append("Limited GitHub activity - smaller community")
if health['corporate_backing'] < 40:
risks.append("Weak corporate backing - sustainability concerns")
if health['npm_health'] < 50 and self.npm_data:
risks.append("Low npm adoption - limited ecosystem")
if health['community_health'] < 50:
risks.append("Small community - limited resources and support")
return risks if risks else ["No significant risks identified"]
def _identify_viability_strengths(self, health: Dict[str, float]) -> List[str]:
"""
Identify viability strengths from health scores.
Args:
health: Health score components
Returns:
List of identified strengths
"""
strengths = []
if health['maintenance_health'] >= 70:
strengths.append("Active maintenance with responsive issue resolution")
if health['github_health'] >= 70:
strengths.append("Strong GitHub presence with active community")
if health['corporate_backing'] >= 70:
strengths.append("Strong corporate backing ensures sustainability")
if health['npm_health'] >= 70 and self.npm_data:
strengths.append("High npm adoption with stable releases")
if health['community_health'] >= 70:
strengths.append("Large, active community with extensive resources")
return strengths if strengths else ["Baseline viability maintained"]
def _generate_viability_recommendation(self, health_score: float, risks: List[str]) -> str:
"""
Generate viability recommendation.
Args:
health_score: Overall health score
risks: List of identified risks
Returns:
Recommendation string
"""
if health_score >= 80:
return "Recommended for long-term adoption - strong ecosystem support"
elif health_score >= 65:
return "Suitable for adoption - monitor identified risks"
elif health_score >= 50:
return "Proceed with caution - have contingency plans"
else:
return "Not recommended - consider alternatives with stronger ecosystems"
def generate_ecosystem_report(self) -> Dict[str, Any]:
"""
Generate comprehensive ecosystem report.
Returns:
Complete ecosystem analysis
"""
health = self.calculate_health_score()
viability = self.assess_viability()
return {
'technology': self.technology,
'health_scores': health,
'viability_assessment': viability,
'github_metrics': self._format_github_metrics(),
'npm_metrics': self._format_npm_metrics() if self.npm_data else None,
'community_metrics': self._format_community_metrics()
}
def _format_github_metrics(self) -> Dict[str, Any]:
"""Format GitHub metrics for reporting."""
return {
'stars': f"{self.github_data.get('stars', 0):,}",
'forks': f"{self.github_data.get('forks', 0):,}",
'contributors': f"{self.github_data.get('contributors', 0):,}",
'commits_last_month': self.github_data.get('commits_last_month', 0),
'open_issues': self.github_data.get('open_issues', 0),
'issue_resolution_rate': f"{self.github_data.get('issue_resolution_rate', 0) * 100:.1f}%"
}
def _format_npm_metrics(self) -> Dict[str, Any]:
"""Format npm metrics for reporting."""
return {
'weekly_downloads': f"{self.npm_data.get('weekly_downloads', 0):,}",
'version': self.npm_data.get('version', 'N/A'),
'dependencies': self.npm_data.get('dependencies_count', 0),
'days_since_publish': self.npm_data.get('days_since_last_publish', 0)
}
def _format_community_metrics(self) -> Dict[str, Any]:
"""Format community metrics for reporting."""
return {
'stackoverflow_questions': f"{self.community_data.get('stackoverflow_questions', 0):,}",
'job_postings': f"{self.community_data.get('job_postings', 0):,}",
'tutorials': self.community_data.get('tutorials_count', 0),
'forum_members': f"{self.community_data.get('forum_members', 0):,}"
}

View File

@@ -0,0 +1,430 @@
"""
Input Format Detector.
Automatically detects input format (text, YAML, JSON, URLs) and parses
accordingly for technology stack evaluation requests.
"""
from typing import Dict, Any, Optional, Tuple
import json
import re
class FormatDetector:
"""Detect and parse various input formats for stack evaluation."""
def __init__(self, input_data: str):
"""
Initialize format detector with raw input.
Args:
input_data: Raw input string from user
"""
self.raw_input = input_data.strip()
self.detected_format = None
self.parsed_data = None
def detect_format(self) -> str:
"""
Detect the input format.
Returns:
Format type: 'json', 'yaml', 'url', 'text'
"""
# Try JSON first
if self._is_json():
self.detected_format = 'json'
return 'json'
# Try YAML
if self._is_yaml():
self.detected_format = 'yaml'
return 'yaml'
# Check for URLs
if self._contains_urls():
self.detected_format = 'url'
return 'url'
# Default to conversational text
self.detected_format = 'text'
return 'text'
def _is_json(self) -> bool:
"""Check if input is valid JSON."""
try:
json.loads(self.raw_input)
return True
except (json.JSONDecodeError, ValueError):
return False
def _is_yaml(self) -> bool:
"""
Check if input looks like YAML.
Returns:
True if input appears to be YAML format
"""
# YAML indicators
yaml_patterns = [
r'^\s*[\w\-]+\s*:', # Key-value pairs
r'^\s*-\s+', # List items
r':\s*$', # Trailing colons
]
# Must not be JSON
if self._is_json():
return False
# Check for YAML patterns
lines = self.raw_input.split('\n')
yaml_line_count = 0
for line in lines:
for pattern in yaml_patterns:
if re.match(pattern, line):
yaml_line_count += 1
break
# If >50% of lines match YAML patterns, consider it YAML
if len(lines) > 0 and yaml_line_count / len(lines) > 0.5:
return True
return False
def _contains_urls(self) -> bool:
"""Check if input contains URLs."""
url_pattern = r'https?://[^\s]+'
return bool(re.search(url_pattern, self.raw_input))
def parse(self) -> Dict[str, Any]:
"""
Parse input based on detected format.
Returns:
Parsed data dictionary
"""
if self.detected_format is None:
self.detect_format()
if self.detected_format == 'json':
self.parsed_data = self._parse_json()
elif self.detected_format == 'yaml':
self.parsed_data = self._parse_yaml()
elif self.detected_format == 'url':
self.parsed_data = self._parse_urls()
else: # text
self.parsed_data = self._parse_text()
return self.parsed_data
def _parse_json(self) -> Dict[str, Any]:
"""Parse JSON input."""
try:
data = json.loads(self.raw_input)
return self._normalize_structure(data)
except json.JSONDecodeError:
return {'error': 'Invalid JSON', 'raw': self.raw_input}
def _parse_yaml(self) -> Dict[str, Any]:
"""
Parse YAML-like input (simplified, no external dependencies).
Returns:
Parsed dictionary
"""
result = {}
current_section = None
current_list = None
lines = self.raw_input.split('\n')
for line in lines:
stripped = line.strip()
if not stripped or stripped.startswith('#'):
continue
# Key-value pair
if ':' in stripped:
key, value = stripped.split(':', 1)
key = key.strip()
value = value.strip()
# Empty value might indicate nested structure
if not value:
current_section = key
result[current_section] = {}
current_list = None
else:
if current_section:
result[current_section][key] = self._parse_value(value)
else:
result[key] = self._parse_value(value)
# List item
elif stripped.startswith('-'):
item = stripped[1:].strip()
if current_section:
if current_list is None:
current_list = []
result[current_section] = current_list
current_list.append(self._parse_value(item))
return self._normalize_structure(result)
def _parse_value(self, value: str) -> Any:
"""
Parse a value string to appropriate type.
Args:
value: Value string
Returns:
Parsed value (str, int, float, bool)
"""
value = value.strip()
# Boolean
if value.lower() in ['true', 'yes']:
return True
if value.lower() in ['false', 'no']:
return False
# Number
try:
if '.' in value:
return float(value)
else:
return int(value)
except ValueError:
pass
# String (remove quotes if present)
if value.startswith('"') and value.endswith('"'):
return value[1:-1]
if value.startswith("'") and value.endswith("'"):
return value[1:-1]
return value
def _parse_urls(self) -> Dict[str, Any]:
"""Parse URLs from input."""
url_pattern = r'https?://[^\s]+'
urls = re.findall(url_pattern, self.raw_input)
# Categorize URLs
github_urls = [u for u in urls if 'github.com' in u]
npm_urls = [u for u in urls if 'npmjs.com' in u or 'npm.io' in u]
other_urls = [u for u in urls if u not in github_urls and u not in npm_urls]
# Also extract any text context
text_without_urls = re.sub(url_pattern, '', self.raw_input).strip()
result = {
'format': 'url',
'urls': {
'github': github_urls,
'npm': npm_urls,
'other': other_urls
},
'context': text_without_urls
}
return self._normalize_structure(result)
def _parse_text(self) -> Dict[str, Any]:
"""Parse conversational text input."""
text = self.raw_input.lower()
# Extract technologies being compared
technologies = self._extract_technologies(text)
# Extract use case
use_case = self._extract_use_case(text)
# Extract priorities
priorities = self._extract_priorities(text)
# Detect analysis type
analysis_type = self._detect_analysis_type(text)
result = {
'format': 'text',
'technologies': technologies,
'use_case': use_case,
'priorities': priorities,
'analysis_type': analysis_type,
'raw_text': self.raw_input
}
return self._normalize_structure(result)
def _extract_technologies(self, text: str) -> list:
"""
Extract technology names from text.
Args:
text: Lowercase text
Returns:
List of identified technologies
"""
# Common technologies pattern
tech_keywords = [
'react', 'vue', 'angular', 'svelte', 'next.js', 'nuxt.js',
'node.js', 'python', 'java', 'go', 'rust', 'ruby',
'postgresql', 'postgres', 'mysql', 'mongodb', 'redis',
'aws', 'azure', 'gcp', 'google cloud',
'docker', 'kubernetes', 'k8s',
'express', 'fastapi', 'django', 'flask', 'spring boot'
]
found = []
for tech in tech_keywords:
if tech in text:
# Normalize names
normalized = {
'postgres': 'PostgreSQL',
'next.js': 'Next.js',
'nuxt.js': 'Nuxt.js',
'node.js': 'Node.js',
'k8s': 'Kubernetes',
'gcp': 'Google Cloud Platform'
}.get(tech, tech.title())
if normalized not in found:
found.append(normalized)
return found if found else ['Unknown']
def _extract_use_case(self, text: str) -> str:
"""
Extract use case description from text.
Args:
text: Lowercase text
Returns:
Use case description
"""
use_case_keywords = {
'real-time': 'Real-time application',
'collaboration': 'Collaboration platform',
'saas': 'SaaS application',
'dashboard': 'Dashboard application',
'api': 'API-heavy application',
'data-intensive': 'Data-intensive application',
'e-commerce': 'E-commerce platform',
'enterprise': 'Enterprise application'
}
for keyword, description in use_case_keywords.items():
if keyword in text:
return description
return 'General purpose application'
def _extract_priorities(self, text: str) -> list:
"""
Extract priority criteria from text.
Args:
text: Lowercase text
Returns:
List of priorities
"""
priority_keywords = {
'performance': 'Performance',
'scalability': 'Scalability',
'developer experience': 'Developer experience',
'ecosystem': 'Ecosystem',
'learning curve': 'Learning curve',
'cost': 'Cost',
'security': 'Security',
'compliance': 'Compliance'
}
priorities = []
for keyword, priority in priority_keywords.items():
if keyword in text:
priorities.append(priority)
return priorities if priorities else ['Developer experience', 'Performance']
def _detect_analysis_type(self, text: str) -> str:
"""
Detect type of analysis requested.
Args:
text: Lowercase text
Returns:
Analysis type
"""
type_keywords = {
'migration': 'migration_analysis',
'migrate': 'migration_analysis',
'tco': 'tco_analysis',
'total cost': 'tco_analysis',
'security': 'security_analysis',
'compliance': 'security_analysis',
'compare': 'comparison',
'vs': 'comparison',
'evaluate': 'evaluation'
}
for keyword, analysis_type in type_keywords.items():
if keyword in text:
return analysis_type
return 'comparison' # Default
def _normalize_structure(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Normalize parsed data to standard structure.
Args:
data: Parsed data dictionary
Returns:
Normalized data structure
"""
# Ensure standard keys exist
standard_keys = [
'technologies',
'use_case',
'priorities',
'analysis_type',
'format'
]
normalized = data.copy()
for key in standard_keys:
if key not in normalized:
# Set defaults
defaults = {
'technologies': [],
'use_case': 'general',
'priorities': [],
'analysis_type': 'comparison',
'format': self.detected_format or 'unknown'
}
normalized[key] = defaults.get(key)
return normalized
def get_format_info(self) -> Dict[str, Any]:
"""
Get information about detected format.
Returns:
Format detection metadata
"""
return {
'detected_format': self.detected_format,
'input_length': len(self.raw_input),
'line_count': len(self.raw_input.split('\n')),
'parsing_successful': self.parsed_data is not None
}

View File

@@ -0,0 +1,587 @@
"""
Migration Path Analyzer.
Analyzes migration complexity, risks, timelines, and strategies for moving
from legacy technology stacks to modern alternatives.
"""
from typing import Dict, List, Any, Optional, Tuple
class MigrationAnalyzer:
"""Analyze migration paths and complexity for technology stack changes."""
# Migration complexity factors
COMPLEXITY_FACTORS = [
'code_volume',
'architecture_changes',
'data_migration',
'api_compatibility',
'dependency_changes',
'testing_requirements'
]
def __init__(self, migration_data: Dict[str, Any]):
"""
Initialize migration analyzer with migration parameters.
Args:
migration_data: Dictionary containing source/target technologies and constraints
"""
self.source_tech = migration_data.get('source_technology', 'Unknown')
self.target_tech = migration_data.get('target_technology', 'Unknown')
self.codebase_stats = migration_data.get('codebase_stats', {})
self.constraints = migration_data.get('constraints', {})
self.team_info = migration_data.get('team', {})
def calculate_complexity_score(self) -> Dict[str, Any]:
"""
Calculate overall migration complexity (1-10 scale).
Returns:
Dictionary with complexity scores by factor
"""
scores = {
'code_volume': self._score_code_volume(),
'architecture_changes': self._score_architecture_changes(),
'data_migration': self._score_data_migration(),
'api_compatibility': self._score_api_compatibility(),
'dependency_changes': self._score_dependency_changes(),
'testing_requirements': self._score_testing_requirements()
}
# Calculate weighted average
weights = {
'code_volume': 0.20,
'architecture_changes': 0.25,
'data_migration': 0.20,
'api_compatibility': 0.15,
'dependency_changes': 0.10,
'testing_requirements': 0.10
}
overall = sum(scores[k] * weights[k] for k in scores.keys())
scores['overall_complexity'] = overall
return scores
def _score_code_volume(self) -> float:
"""
Score complexity based on codebase size.
Returns:
Code volume complexity score (1-10)
"""
lines_of_code = self.codebase_stats.get('lines_of_code', 10000)
num_files = self.codebase_stats.get('num_files', 100)
num_components = self.codebase_stats.get('num_components', 50)
# Score based on lines of code (primary factor)
if lines_of_code < 5000:
base_score = 2
elif lines_of_code < 20000:
base_score = 4
elif lines_of_code < 50000:
base_score = 6
elif lines_of_code < 100000:
base_score = 8
else:
base_score = 10
# Adjust for component count
if num_components > 200:
base_score = min(10, base_score + 1)
elif num_components > 500:
base_score = min(10, base_score + 2)
return float(base_score)
def _score_architecture_changes(self) -> float:
"""
Score complexity based on architectural changes.
Returns:
Architecture complexity score (1-10)
"""
arch_change_level = self.codebase_stats.get('architecture_change_level', 'moderate')
scores = {
'minimal': 2, # Same patterns, just different framework
'moderate': 5, # Some pattern changes, similar concepts
'significant': 7, # Different patterns, major refactoring
'complete': 10 # Complete rewrite, different paradigm
}
return float(scores.get(arch_change_level, 5))
def _score_data_migration(self) -> float:
"""
Score complexity based on data migration requirements.
Returns:
Data migration complexity score (1-10)
"""
has_database = self.codebase_stats.get('has_database', True)
if not has_database:
return 1.0
database_size_gb = self.codebase_stats.get('database_size_gb', 10)
schema_changes = self.codebase_stats.get('schema_changes_required', 'minimal')
data_transformation = self.codebase_stats.get('data_transformation_required', False)
# Base score from database size
if database_size_gb < 1:
score = 2
elif database_size_gb < 10:
score = 3
elif database_size_gb < 100:
score = 5
elif database_size_gb < 1000:
score = 7
else:
score = 9
# Adjust for schema changes
schema_adjustments = {
'none': 0,
'minimal': 1,
'moderate': 2,
'significant': 3
}
score += schema_adjustments.get(schema_changes, 1)
# Adjust for data transformation
if data_transformation:
score += 2
return min(10.0, float(score))
def _score_api_compatibility(self) -> float:
"""
Score complexity based on API compatibility.
Returns:
API compatibility complexity score (1-10)
"""
breaking_api_changes = self.codebase_stats.get('breaking_api_changes', 'some')
scores = {
'none': 1, # Fully compatible
'minimal': 3, # Few breaking changes
'some': 5, # Moderate breaking changes
'many': 7, # Significant breaking changes
'complete': 10 # Complete API rewrite
}
return float(scores.get(breaking_api_changes, 5))
def _score_dependency_changes(self) -> float:
"""
Score complexity based on dependency changes.
Returns:
Dependency complexity score (1-10)
"""
num_dependencies = self.codebase_stats.get('num_dependencies', 20)
dependencies_to_replace = self.codebase_stats.get('dependencies_to_replace', 5)
# Score based on replacement percentage
if num_dependencies == 0:
return 1.0
replacement_pct = (dependencies_to_replace / num_dependencies) * 100
if replacement_pct < 10:
return 2.0
elif replacement_pct < 25:
return 4.0
elif replacement_pct < 50:
return 6.0
elif replacement_pct < 75:
return 8.0
else:
return 10.0
def _score_testing_requirements(self) -> float:
"""
Score complexity based on testing requirements.
Returns:
Testing complexity score (1-10)
"""
test_coverage = self.codebase_stats.get('current_test_coverage', 0.5) # 0-1 scale
num_tests = self.codebase_stats.get('num_tests', 100)
# If good test coverage, easier migration (can verify)
if test_coverage >= 0.8:
base_score = 3
elif test_coverage >= 0.6:
base_score = 5
elif test_coverage >= 0.4:
base_score = 7
else:
base_score = 9 # Poor coverage = hard to verify migration
# Large test suites need updates
if num_tests > 500:
base_score = min(10, base_score + 1)
return float(base_score)
def estimate_effort(self) -> Dict[str, Any]:
"""
Estimate migration effort in person-hours and timeline.
Returns:
Dictionary with effort estimates
"""
complexity = self.calculate_complexity_score()
overall_complexity = complexity['overall_complexity']
# Base hours estimation
lines_of_code = self.codebase_stats.get('lines_of_code', 10000)
base_hours = lines_of_code / 50 # 50 lines per hour baseline
# Complexity multiplier
complexity_multiplier = 1 + (overall_complexity / 10)
estimated_hours = base_hours * complexity_multiplier
# Break down by phase
phases = self._calculate_phase_breakdown(estimated_hours)
# Calculate timeline
team_size = self.team_info.get('team_size', 3)
hours_per_week_per_dev = self.team_info.get('hours_per_week', 30) # Account for other work
total_dev_weeks = estimated_hours / (team_size * hours_per_week_per_dev)
total_calendar_weeks = total_dev_weeks * 1.2 # Buffer for blockers
return {
'total_hours': estimated_hours,
'total_person_months': estimated_hours / 160, # 160 hours per person-month
'phases': phases,
'estimated_timeline': {
'dev_weeks': total_dev_weeks,
'calendar_weeks': total_calendar_weeks,
'calendar_months': total_calendar_weeks / 4.33
},
'team_assumptions': {
'team_size': team_size,
'hours_per_week_per_dev': hours_per_week_per_dev
}
}
def _calculate_phase_breakdown(self, total_hours: float) -> Dict[str, Dict[str, float]]:
"""
Calculate effort breakdown by migration phase.
Args:
total_hours: Total estimated hours
Returns:
Hours breakdown by phase
"""
# Standard phase percentages
phase_percentages = {
'planning_and_prototyping': 0.15,
'core_migration': 0.45,
'testing_and_validation': 0.25,
'deployment_and_monitoring': 0.10,
'buffer_and_contingency': 0.05
}
phases = {}
for phase, percentage in phase_percentages.items():
hours = total_hours * percentage
phases[phase] = {
'hours': hours,
'person_weeks': hours / 40,
'percentage': f"{percentage * 100:.0f}%"
}
return phases
def assess_risks(self) -> Dict[str, List[Dict[str, str]]]:
"""
Identify and assess migration risks.
Returns:
Categorized risks with mitigation strategies
"""
complexity = self.calculate_complexity_score()
risks = {
'technical_risks': self._identify_technical_risks(complexity),
'business_risks': self._identify_business_risks(),
'team_risks': self._identify_team_risks()
}
return risks
def _identify_technical_risks(self, complexity: Dict[str, float]) -> List[Dict[str, str]]:
"""
Identify technical risks.
Args:
complexity: Complexity scores
Returns:
List of technical risks with mitigations
"""
risks = []
# API compatibility risks
if complexity['api_compatibility'] >= 7:
risks.append({
'risk': 'Breaking API changes may cause integration failures',
'severity': 'High',
'mitigation': 'Create compatibility layer; implement feature flags for gradual rollout'
})
# Data migration risks
if complexity['data_migration'] >= 7:
risks.append({
'risk': 'Data migration could cause data loss or corruption',
'severity': 'Critical',
'mitigation': 'Implement robust backup strategy; run parallel systems during migration; extensive validation'
})
# Architecture risks
if complexity['architecture_changes'] >= 8:
risks.append({
'risk': 'Major architectural changes increase risk of performance regression',
'severity': 'High',
'mitigation': 'Extensive performance testing; staged rollout; monitoring and alerting'
})
# Testing risks
if complexity['testing_requirements'] >= 7:
risks.append({
'risk': 'Inadequate test coverage may miss critical bugs',
'severity': 'Medium',
'mitigation': 'Improve test coverage before migration; automated regression testing; user acceptance testing'
})
if not risks:
risks.append({
'risk': 'Standard technical risks (bugs, edge cases)',
'severity': 'Low',
'mitigation': 'Standard QA processes and staged rollout'
})
return risks
def _identify_business_risks(self) -> List[Dict[str, str]]:
"""
Identify business risks.
Returns:
List of business risks with mitigations
"""
risks = []
# Downtime risk
downtime_tolerance = self.constraints.get('downtime_tolerance', 'low')
if downtime_tolerance == 'none':
risks.append({
'risk': 'Zero-downtime migration increases complexity and risk',
'severity': 'High',
'mitigation': 'Blue-green deployment; feature flags; gradual traffic migration'
})
# Feature parity risk
risks.append({
'risk': 'New implementation may lack feature parity',
'severity': 'Medium',
'mitigation': 'Comprehensive feature audit; prioritized feature list; clear communication'
})
# Timeline risk
risks.append({
'risk': 'Migration may take longer than estimated',
'severity': 'Medium',
'mitigation': 'Build in 20% buffer; regular progress reviews; scope management'
})
return risks
def _identify_team_risks(self) -> List[Dict[str, str]]:
"""
Identify team-related risks.
Returns:
List of team risks with mitigations
"""
risks = []
# Learning curve
team_experience = self.team_info.get('target_tech_experience', 'low')
if team_experience in ['low', 'none']:
risks.append({
'risk': 'Team lacks experience with target technology',
'severity': 'High',
'mitigation': 'Training program; hire experienced developers; external consulting'
})
# Team size
team_size = self.team_info.get('team_size', 3)
if team_size < 3:
risks.append({
'risk': 'Small team size may extend timeline',
'severity': 'Medium',
'mitigation': 'Consider augmenting team; reduce scope; extend timeline'
})
# Knowledge retention
risks.append({
'risk': 'Loss of institutional knowledge during migration',
'severity': 'Medium',
'mitigation': 'Comprehensive documentation; knowledge sharing sessions; pair programming'
})
return risks
def generate_migration_plan(self) -> Dict[str, Any]:
"""
Generate comprehensive migration plan.
Returns:
Complete migration plan with timeline and recommendations
"""
complexity = self.calculate_complexity_score()
effort = self.estimate_effort()
risks = self.assess_risks()
# Generate phased approach
approach = self._recommend_migration_approach(complexity['overall_complexity'])
# Generate recommendation
recommendation = self._generate_migration_recommendation(complexity, effort, risks)
return {
'source_technology': self.source_tech,
'target_technology': self.target_tech,
'complexity_analysis': complexity,
'effort_estimation': effort,
'risk_assessment': risks,
'recommended_approach': approach,
'overall_recommendation': recommendation,
'success_criteria': self._define_success_criteria()
}
def _recommend_migration_approach(self, complexity_score: float) -> Dict[str, Any]:
"""
Recommend migration approach based on complexity.
Args:
complexity_score: Overall complexity score
Returns:
Recommended approach details
"""
if complexity_score <= 3:
approach = 'direct_migration'
description = 'Direct migration - low complexity allows straightforward migration'
timeline_multiplier = 1.0
elif complexity_score <= 6:
approach = 'phased_migration'
description = 'Phased migration - migrate components incrementally to manage risk'
timeline_multiplier = 1.3
else:
approach = 'strangler_pattern'
description = 'Strangler pattern - gradually replace old system while running in parallel'
timeline_multiplier = 1.5
return {
'approach': approach,
'description': description,
'timeline_multiplier': timeline_multiplier,
'phases': self._generate_approach_phases(approach)
}
def _generate_approach_phases(self, approach: str) -> List[str]:
"""
Generate phase descriptions for migration approach.
Args:
approach: Migration approach type
Returns:
List of phase descriptions
"""
phases = {
'direct_migration': [
'Phase 1: Set up target environment and migrate configuration',
'Phase 2: Migrate codebase and dependencies',
'Phase 3: Migrate data with validation',
'Phase 4: Comprehensive testing',
'Phase 5: Cutover and monitoring'
],
'phased_migration': [
'Phase 1: Identify and prioritize components for migration',
'Phase 2: Migrate non-critical components first',
'Phase 3: Migrate core components with parallel running',
'Phase 4: Migrate critical components with rollback plan',
'Phase 5: Decommission old system'
],
'strangler_pattern': [
'Phase 1: Set up routing layer between old and new systems',
'Phase 2: Implement new features in target technology only',
'Phase 3: Gradually migrate existing features (lowest risk first)',
'Phase 4: Migrate high-risk components last with extensive testing',
'Phase 5: Complete migration and remove routing layer'
]
}
return phases.get(approach, phases['phased_migration'])
def _generate_migration_recommendation(
self,
complexity: Dict[str, float],
effort: Dict[str, Any],
risks: Dict[str, List[Dict[str, str]]]
) -> str:
"""
Generate overall migration recommendation.
Args:
complexity: Complexity analysis
effort: Effort estimation
risks: Risk assessment
Returns:
Recommendation string
"""
overall_complexity = complexity['overall_complexity']
timeline_months = effort['estimated_timeline']['calendar_months']
# Count high/critical severity risks
high_risk_count = sum(
1 for risk_list in risks.values()
for risk in risk_list
if risk['severity'] in ['High', 'Critical']
)
if overall_complexity <= 4 and high_risk_count <= 2:
return f"Recommended - Low complexity migration achievable in {timeline_months:.1f} months with manageable risks"
elif overall_complexity <= 7 and high_risk_count <= 4:
return f"Proceed with caution - Moderate complexity migration requiring {timeline_months:.1f} months and careful risk management"
else:
return f"High risk - Complex migration requiring {timeline_months:.1f} months. Consider: incremental approach, additional resources, or alternative solutions"
def _define_success_criteria(self) -> List[str]:
"""
Define success criteria for migration.
Returns:
List of success criteria
"""
return [
'Feature parity with current system',
'Performance equal or better than current system',
'Zero data loss or corruption',
'All tests passing (unit, integration, E2E)',
'Successful production deployment with <1% error rate',
'Team trained and comfortable with new technology',
'Documentation complete and up-to-date'
]

View File

@@ -0,0 +1,460 @@
"""
Report Generator - Context-aware report generation with progressive disclosure.
Generates reports adapted for Claude Desktop (rich markdown) or CLI (terminal-friendly),
with executive summaries and detailed breakdowns on demand.
"""
from typing import Dict, List, Any, Optional
import os
import platform
class ReportGenerator:
"""Generate context-aware technology evaluation reports."""
def __init__(self, report_data: Dict[str, Any], output_context: Optional[str] = None):
"""
Initialize report generator.
Args:
report_data: Complete evaluation data
output_context: 'desktop', 'cli', or None for auto-detect
"""
self.report_data = report_data
self.output_context = output_context or self._detect_context()
def _detect_context(self) -> str:
"""
Detect output context (Desktop vs CLI).
Returns:
Context type: 'desktop' or 'cli'
"""
# Check for Claude Desktop environment variables or indicators
# This is a simplified detection - actual implementation would check for
# Claude Desktop-specific environment variables
if os.getenv('CLAUDE_DESKTOP'):
return 'desktop'
# Check if running in terminal
if os.isatty(1): # stdout is a terminal
return 'cli'
# Default to desktop for rich formatting
return 'desktop'
def generate_executive_summary(self, max_tokens: int = 300) -> str:
"""
Generate executive summary (200-300 tokens).
Args:
max_tokens: Maximum tokens for summary
Returns:
Executive summary markdown
"""
summary_parts = []
# Title
technologies = self.report_data.get('technologies', [])
tech_names = ', '.join(technologies[:3]) # First 3
summary_parts.append(f"# Technology Evaluation: {tech_names}\n")
# Recommendation
recommendation = self.report_data.get('recommendation', {})
rec_text = recommendation.get('text', 'No recommendation available')
confidence = recommendation.get('confidence', 0)
summary_parts.append(f"## Recommendation\n")
summary_parts.append(f"**{rec_text}**\n")
summary_parts.append(f"*Confidence: {confidence:.0f}%*\n")
# Top 3 Pros
pros = recommendation.get('pros', [])[:3]
if pros:
summary_parts.append(f"\n### Top Strengths\n")
for pro in pros:
summary_parts.append(f"- {pro}\n")
# Top 3 Cons
cons = recommendation.get('cons', [])[:3]
if cons:
summary_parts.append(f"\n### Key Concerns\n")
for con in cons:
summary_parts.append(f"- {con}\n")
# Key Decision Factors
decision_factors = self.report_data.get('decision_factors', [])[:3]
if decision_factors:
summary_parts.append(f"\n### Decision Factors\n")
for factor in decision_factors:
category = factor.get('category', 'Unknown')
best = factor.get('best_performer', 'Unknown')
summary_parts.append(f"- **{category.replace('_', ' ').title()}**: {best}\n")
summary_parts.append(f"\n---\n")
summary_parts.append(f"*For detailed analysis, request full report sections*\n")
return ''.join(summary_parts)
def generate_full_report(self, sections: Optional[List[str]] = None) -> str:
"""
Generate complete report with selected sections.
Args:
sections: List of sections to include, or None for all
Returns:
Complete report markdown
"""
if sections is None:
sections = self._get_available_sections()
report_parts = []
# Title and metadata
report_parts.append(self._generate_title())
# Generate each requested section
for section in sections:
section_content = self._generate_section(section)
if section_content:
report_parts.append(section_content)
return '\n\n'.join(report_parts)
def _get_available_sections(self) -> List[str]:
"""
Get list of available report sections.
Returns:
List of section names
"""
sections = ['executive_summary']
if 'comparison_matrix' in self.report_data:
sections.append('comparison_matrix')
if 'tco_analysis' in self.report_data:
sections.append('tco_analysis')
if 'ecosystem_health' in self.report_data:
sections.append('ecosystem_health')
if 'security_assessment' in self.report_data:
sections.append('security_assessment')
if 'migration_analysis' in self.report_data:
sections.append('migration_analysis')
if 'performance_benchmarks' in self.report_data:
sections.append('performance_benchmarks')
return sections
def _generate_title(self) -> str:
"""Generate report title section."""
technologies = self.report_data.get('technologies', [])
tech_names = ' vs '.join(technologies)
use_case = self.report_data.get('use_case', 'General Purpose')
if self.output_context == 'desktop':
return f"""# Technology Stack Evaluation Report
**Technologies**: {tech_names}
**Use Case**: {use_case}
**Generated**: {self._get_timestamp()}
---
"""
else: # CLI
return f"""================================================================================
TECHNOLOGY STACK EVALUATION REPORT
================================================================================
Technologies: {tech_names}
Use Case: {use_case}
Generated: {self._get_timestamp()}
================================================================================
"""
def _generate_section(self, section_name: str) -> Optional[str]:
"""
Generate specific report section.
Args:
section_name: Name of section to generate
Returns:
Section markdown or None
"""
generators = {
'executive_summary': self._section_executive_summary,
'comparison_matrix': self._section_comparison_matrix,
'tco_analysis': self._section_tco_analysis,
'ecosystem_health': self._section_ecosystem_health,
'security_assessment': self._section_security_assessment,
'migration_analysis': self._section_migration_analysis,
'performance_benchmarks': self._section_performance_benchmarks
}
generator = generators.get(section_name)
if generator:
return generator()
return None
def _section_executive_summary(self) -> str:
"""Generate executive summary section."""
return self.generate_executive_summary()
def _section_comparison_matrix(self) -> str:
"""Generate comparison matrix section."""
matrix_data = self.report_data.get('comparison_matrix', [])
if not matrix_data:
return ""
if self.output_context == 'desktop':
return self._render_matrix_desktop(matrix_data)
else:
return self._render_matrix_cli(matrix_data)
def _render_matrix_desktop(self, matrix_data: List[Dict[str, Any]]) -> str:
"""Render comparison matrix for desktop (rich markdown table)."""
parts = ["## Comparison Matrix\n"]
if not matrix_data:
return ""
# Get technology names from first row
tech_names = list(matrix_data[0].get('scores', {}).keys())
# Build table header
header = "| Category | Weight |"
for tech in tech_names:
header += f" {tech} |"
parts.append(header)
# Separator
separator = "|----------|--------|"
separator += "--------|" * len(tech_names)
parts.append(separator)
# Rows
for row in matrix_data:
category = row.get('category', '').replace('_', ' ').title()
weight = row.get('weight', '')
scores = row.get('scores', {})
row_str = f"| {category} | {weight} |"
for tech in tech_names:
score = scores.get(tech, '0.0')
row_str += f" {score} |"
parts.append(row_str)
return '\n'.join(parts)
def _render_matrix_cli(self, matrix_data: List[Dict[str, Any]]) -> str:
"""Render comparison matrix for CLI (ASCII table)."""
parts = ["COMPARISON MATRIX", "=" * 80, ""]
if not matrix_data:
return ""
# Get technology names
tech_names = list(matrix_data[0].get('scores', {}).keys())
# Calculate column widths
category_width = 25
weight_width = 8
score_width = 10
# Header
header = f"{'Category':<{category_width}} {'Weight':<{weight_width}}"
for tech in tech_names:
header += f" {tech[:score_width-1]:<{score_width}}"
parts.append(header)
parts.append("-" * 80)
# Rows
for row in matrix_data:
category = row.get('category', '').replace('_', ' ').title()[:category_width-1]
weight = row.get('weight', '')
scores = row.get('scores', {})
row_str = f"{category:<{category_width}} {weight:<{weight_width}}"
for tech in tech_names:
score = scores.get(tech, '0.0')
row_str += f" {score:<{score_width}}"
parts.append(row_str)
return '\n'.join(parts)
def _section_tco_analysis(self) -> str:
"""Generate TCO analysis section."""
tco_data = self.report_data.get('tco_analysis', {})
if not tco_data:
return ""
parts = ["## Total Cost of Ownership Analysis\n"]
# Summary
total_tco = tco_data.get('total_tco', 0)
timeline = tco_data.get('timeline_years', 5)
avg_yearly = tco_data.get('average_yearly_cost', 0)
parts.append(f"**{timeline}-Year Total**: ${total_tco:,.2f}")
parts.append(f"**Average Yearly**: ${avg_yearly:,.2f}\n")
# Cost breakdown
initial = tco_data.get('initial_costs', {})
parts.append(f"### Initial Costs: ${initial.get('total_initial', 0):,.2f}")
# Operational costs
operational = tco_data.get('operational_costs', {})
if operational:
parts.append(f"\n### Operational Costs (Yearly)")
yearly_totals = operational.get('total_yearly', [])
for year, cost in enumerate(yearly_totals, 1):
parts.append(f"- Year {year}: ${cost:,.2f}")
return '\n'.join(parts)
def _section_ecosystem_health(self) -> str:
"""Generate ecosystem health section."""
ecosystem_data = self.report_data.get('ecosystem_health', {})
if not ecosystem_data:
return ""
parts = ["## Ecosystem Health Analysis\n"]
# Overall score
overall_score = ecosystem_data.get('overall_health', 0)
parts.append(f"**Overall Health Score**: {overall_score:.1f}/100\n")
# Component scores
scores = ecosystem_data.get('health_scores', {})
parts.append("### Health Metrics")
for metric, score in scores.items():
if metric != 'overall_health':
metric_name = metric.replace('_', ' ').title()
parts.append(f"- {metric_name}: {score:.1f}/100")
# Viability assessment
viability = ecosystem_data.get('viability_assessment', {})
if viability:
parts.append(f"\n### Viability: {viability.get('overall_viability', 'Unknown')}")
parts.append(f"**Risk Level**: {viability.get('risk_level', 'Unknown')}")
return '\n'.join(parts)
def _section_security_assessment(self) -> str:
"""Generate security assessment section."""
security_data = self.report_data.get('security_assessment', {})
if not security_data:
return ""
parts = ["## Security & Compliance Assessment\n"]
# Security score
security_score = security_data.get('security_score', {})
overall = security_score.get('overall_security_score', 0)
grade = security_score.get('security_grade', 'N/A')
parts.append(f"**Security Score**: {overall:.1f}/100 (Grade: {grade})\n")
# Compliance
compliance = security_data.get('compliance_assessment', {})
if compliance:
parts.append("### Compliance Readiness")
for standard, assessment in compliance.items():
level = assessment.get('readiness_level', 'Unknown')
pct = assessment.get('readiness_percentage', 0)
parts.append(f"- **{standard}**: {level} ({pct:.0f}%)")
return '\n'.join(parts)
def _section_migration_analysis(self) -> str:
"""Generate migration analysis section."""
migration_data = self.report_data.get('migration_analysis', {})
if not migration_data:
return ""
parts = ["## Migration Path Analysis\n"]
# Complexity
complexity = migration_data.get('complexity_analysis', {})
overall_complexity = complexity.get('overall_complexity', 0)
parts.append(f"**Migration Complexity**: {overall_complexity:.1f}/10\n")
# Effort estimation
effort = migration_data.get('effort_estimation', {})
if effort:
total_hours = effort.get('total_hours', 0)
person_months = effort.get('total_person_months', 0)
timeline = effort.get('estimated_timeline', {})
calendar_months = timeline.get('calendar_months', 0)
parts.append(f"### Effort Estimate")
parts.append(f"- Total Effort: {person_months:.1f} person-months ({total_hours:.0f} hours)")
parts.append(f"- Timeline: {calendar_months:.1f} calendar months")
# Recommended approach
approach = migration_data.get('recommended_approach', {})
if approach:
parts.append(f"\n### Recommended Approach: {approach.get('approach', 'Unknown').replace('_', ' ').title()}")
parts.append(f"{approach.get('description', '')}")
return '\n'.join(parts)
def _section_performance_benchmarks(self) -> str:
"""Generate performance benchmarks section."""
benchmark_data = self.report_data.get('performance_benchmarks', {})
if not benchmark_data:
return ""
parts = ["## Performance Benchmarks\n"]
# Throughput
throughput = benchmark_data.get('throughput', {})
if throughput:
parts.append("### Throughput")
for tech, rps in throughput.items():
parts.append(f"- {tech}: {rps:,} requests/sec")
# Latency
latency = benchmark_data.get('latency', {})
if latency:
parts.append("\n### Latency (P95)")
for tech, ms in latency.items():
parts.append(f"- {tech}: {ms}ms")
return '\n'.join(parts)
def _get_timestamp(self) -> str:
"""Get current timestamp."""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M")
def export_to_file(self, filename: str, sections: Optional[List[str]] = None) -> str:
"""
Export report to file.
Args:
filename: Output filename
sections: Sections to include
Returns:
Path to exported file
"""
report = self.generate_full_report(sections)
with open(filename, 'w', encoding='utf-8') as f:
f.write(report)
return filename

View File

@@ -0,0 +1,518 @@
"""
Security and Compliance Assessor.
Analyzes security vulnerabilities, compliance readiness (GDPR, SOC2, HIPAA),
and overall security posture of technology stacks.
"""
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
class SecurityAssessor:
"""Assess security and compliance readiness of technology stacks."""
# Compliance standards mapping
COMPLIANCE_STANDARDS = {
'GDPR': ['data_privacy', 'consent_management', 'data_portability', 'right_to_deletion', 'audit_logging'],
'SOC2': ['access_controls', 'encryption_at_rest', 'encryption_in_transit', 'audit_logging', 'backup_recovery'],
'HIPAA': ['phi_protection', 'encryption_at_rest', 'encryption_in_transit', 'access_controls', 'audit_logging'],
'PCI_DSS': ['payment_data_encryption', 'access_controls', 'network_security', 'vulnerability_management']
}
def __init__(self, security_data: Dict[str, Any]):
"""
Initialize security assessor with security data.
Args:
security_data: Dictionary containing vulnerability and compliance data
"""
self.technology = security_data.get('technology', 'Unknown')
self.vulnerabilities = security_data.get('vulnerabilities', {})
self.security_features = security_data.get('security_features', {})
self.compliance_requirements = security_data.get('compliance_requirements', [])
def calculate_security_score(self) -> Dict[str, Any]:
"""
Calculate overall security score (0-100).
Returns:
Dictionary with security score components
"""
# Component scores
vuln_score = self._score_vulnerabilities()
patch_score = self._score_patch_responsiveness()
features_score = self._score_security_features()
track_record_score = self._score_track_record()
# Weighted average
weights = {
'vulnerability_score': 0.30,
'patch_responsiveness': 0.25,
'security_features': 0.30,
'track_record': 0.15
}
overall = (
vuln_score * weights['vulnerability_score'] +
patch_score * weights['patch_responsiveness'] +
features_score * weights['security_features'] +
track_record_score * weights['track_record']
)
return {
'overall_security_score': overall,
'vulnerability_score': vuln_score,
'patch_responsiveness': patch_score,
'security_features_score': features_score,
'track_record_score': track_record_score,
'security_grade': self._calculate_grade(overall)
}
def _score_vulnerabilities(self) -> float:
"""
Score based on vulnerability count and severity.
Returns:
Vulnerability score (0-100, higher is better)
"""
# Get vulnerability counts by severity (last 12 months)
critical = self.vulnerabilities.get('critical_last_12m', 0)
high = self.vulnerabilities.get('high_last_12m', 0)
medium = self.vulnerabilities.get('medium_last_12m', 0)
low = self.vulnerabilities.get('low_last_12m', 0)
# Calculate weighted vulnerability count
weighted_vulns = (critical * 4) + (high * 2) + (medium * 1) + (low * 0.5)
# Score based on weighted count (fewer is better)
if weighted_vulns == 0:
score = 100
elif weighted_vulns <= 5:
score = 90
elif weighted_vulns <= 10:
score = 80
elif weighted_vulns <= 20:
score = 70
elif weighted_vulns <= 30:
score = 60
elif weighted_vulns <= 50:
score = 50
else:
score = max(0, 50 - (weighted_vulns - 50) / 2)
# Penalty for critical vulnerabilities
if critical > 0:
score = max(0, score - (critical * 10))
return max(0.0, min(100.0, score))
def _score_patch_responsiveness(self) -> float:
"""
Score based on patch response time.
Returns:
Patch responsiveness score (0-100)
"""
# Average days to patch critical vulnerabilities
critical_patch_days = self.vulnerabilities.get('avg_critical_patch_days', 30)
high_patch_days = self.vulnerabilities.get('avg_high_patch_days', 60)
# Score critical patch time (most important)
if critical_patch_days <= 7:
critical_score = 50
elif critical_patch_days <= 14:
critical_score = 40
elif critical_patch_days <= 30:
critical_score = 30
elif critical_patch_days <= 60:
critical_score = 20
else:
critical_score = 10
# Score high severity patch time
if high_patch_days <= 14:
high_score = 30
elif high_patch_days <= 30:
high_score = 25
elif high_patch_days <= 60:
high_score = 20
elif high_patch_days <= 90:
high_score = 15
else:
high_score = 10
# Has active security team
has_security_team = self.vulnerabilities.get('has_security_team', False)
team_score = 20 if has_security_team else 0
total_score = critical_score + high_score + team_score
return min(100.0, total_score)
def _score_security_features(self) -> float:
"""
Score based on built-in security features.
Returns:
Security features score (0-100)
"""
score = 0.0
# Essential features (10 points each)
essential_features = [
'encryption_at_rest',
'encryption_in_transit',
'authentication',
'authorization',
'input_validation'
]
for feature in essential_features:
if self.security_features.get(feature, False):
score += 10
# Advanced features (5 points each)
advanced_features = [
'rate_limiting',
'csrf_protection',
'xss_protection',
'sql_injection_protection',
'audit_logging',
'mfa_support',
'rbac',
'secrets_management',
'security_headers',
'cors_configuration'
]
for feature in advanced_features:
if self.security_features.get(feature, False):
score += 5
return min(100.0, score)
def _score_track_record(self) -> float:
"""
Score based on historical security track record.
Returns:
Track record score (0-100)
"""
score = 50.0 # Start at neutral
# Years since major security incident
years_since_major = self.vulnerabilities.get('years_since_major_incident', 5)
if years_since_major >= 3:
score += 30
elif years_since_major >= 1:
score += 15
else:
score -= 10
# Security certifications
has_certifications = self.vulnerabilities.get('has_security_certifications', False)
if has_certifications:
score += 20
# Bug bounty program
has_bug_bounty = self.vulnerabilities.get('has_bug_bounty_program', False)
if has_bug_bounty:
score += 10
# Security audits
security_audits = self.vulnerabilities.get('security_audits_per_year', 0)
score += min(20, security_audits * 10)
return min(100.0, max(0.0, score))
def _calculate_grade(self, score: float) -> str:
"""
Convert score to letter grade.
Args:
score: Security score (0-100)
Returns:
Letter grade
"""
if score >= 90:
return "A"
elif score >= 80:
return "B"
elif score >= 70:
return "C"
elif score >= 60:
return "D"
else:
return "F"
def assess_compliance(self, standards: List[str] = None) -> Dict[str, Dict[str, Any]]:
"""
Assess compliance readiness for specified standards.
Args:
standards: List of compliance standards to assess (defaults to all required)
Returns:
Dictionary of compliance assessments by standard
"""
if standards is None:
standards = self.compliance_requirements
results = {}
for standard in standards:
if standard not in self.COMPLIANCE_STANDARDS:
results[standard] = {
'readiness': 'Unknown',
'score': 0,
'status': 'Unknown standard'
}
continue
readiness = self._assess_standard_readiness(standard)
results[standard] = readiness
return results
def _assess_standard_readiness(self, standard: str) -> Dict[str, Any]:
"""
Assess readiness for a specific compliance standard.
Args:
standard: Compliance standard name
Returns:
Readiness assessment
"""
required_features = self.COMPLIANCE_STANDARDS[standard]
met_count = 0
total_count = len(required_features)
missing_features = []
for feature in required_features:
if self.security_features.get(feature, False):
met_count += 1
else:
missing_features.append(feature)
# Calculate readiness percentage
readiness_pct = (met_count / total_count * 100) if total_count > 0 else 0
# Determine readiness level
if readiness_pct >= 90:
readiness_level = "Ready"
status = "Compliant - meets all requirements"
elif readiness_pct >= 70:
readiness_level = "Mostly Ready"
status = "Minor gaps - additional configuration needed"
elif readiness_pct >= 50:
readiness_level = "Partial"
status = "Significant work required"
else:
readiness_level = "Not Ready"
status = "Major gaps - extensive implementation needed"
return {
'readiness_level': readiness_level,
'readiness_percentage': readiness_pct,
'status': status,
'features_met': met_count,
'features_required': total_count,
'missing_features': missing_features,
'recommendation': self._generate_compliance_recommendation(readiness_level, missing_features)
}
def _generate_compliance_recommendation(self, readiness_level: str, missing_features: List[str]) -> str:
"""
Generate compliance recommendation.
Args:
readiness_level: Current readiness level
missing_features: List of missing features
Returns:
Recommendation string
"""
if readiness_level == "Ready":
return "Proceed with compliance audit and certification"
elif readiness_level == "Mostly Ready":
return f"Implement missing features: {', '.join(missing_features[:3])}"
elif readiness_level == "Partial":
return f"Significant implementation needed. Start with: {', '.join(missing_features[:3])}"
else:
return "Not recommended without major security enhancements"
def identify_vulnerabilities(self) -> Dict[str, Any]:
"""
Identify and categorize vulnerabilities.
Returns:
Categorized vulnerability report
"""
# Current vulnerabilities
current = {
'critical': self.vulnerabilities.get('critical_last_12m', 0),
'high': self.vulnerabilities.get('high_last_12m', 0),
'medium': self.vulnerabilities.get('medium_last_12m', 0),
'low': self.vulnerabilities.get('low_last_12m', 0)
}
# Historical vulnerabilities (last 3 years)
historical = {
'critical': self.vulnerabilities.get('critical_last_3y', 0),
'high': self.vulnerabilities.get('high_last_3y', 0),
'medium': self.vulnerabilities.get('medium_last_3y', 0),
'low': self.vulnerabilities.get('low_last_3y', 0)
}
# Common vulnerability types
common_types = self.vulnerabilities.get('common_vulnerability_types', [
'SQL Injection',
'XSS',
'CSRF',
'Authentication Issues'
])
return {
'current_vulnerabilities': current,
'total_current': sum(current.values()),
'historical_vulnerabilities': historical,
'total_historical': sum(historical.values()),
'common_types': common_types,
'severity_distribution': self._calculate_severity_distribution(current),
'trend': self._analyze_vulnerability_trend(current, historical)
}
def _calculate_severity_distribution(self, vulnerabilities: Dict[str, int]) -> Dict[str, str]:
"""
Calculate percentage distribution of vulnerability severities.
Args:
vulnerabilities: Vulnerability counts by severity
Returns:
Percentage distribution
"""
total = sum(vulnerabilities.values())
if total == 0:
return {k: "0%" for k in vulnerabilities.keys()}
return {
severity: f"{(count / total * 100):.1f}%"
for severity, count in vulnerabilities.items()
}
def _analyze_vulnerability_trend(self, current: Dict[str, int], historical: Dict[str, int]) -> str:
"""
Analyze vulnerability trend.
Args:
current: Current vulnerabilities
historical: Historical vulnerabilities
Returns:
Trend description
"""
current_total = sum(current.values())
historical_avg = sum(historical.values()) / 3 # 3-year average
if current_total < historical_avg * 0.7:
return "Improving - fewer vulnerabilities than historical average"
elif current_total < historical_avg * 1.2:
return "Stable - consistent with historical average"
else:
return "Concerning - more vulnerabilities than historical average"
def generate_security_report(self) -> Dict[str, Any]:
"""
Generate comprehensive security assessment report.
Returns:
Complete security analysis
"""
security_score = self.calculate_security_score()
compliance = self.assess_compliance()
vulnerabilities = self.identify_vulnerabilities()
# Generate recommendations
recommendations = self._generate_security_recommendations(
security_score,
compliance,
vulnerabilities
)
return {
'technology': self.technology,
'security_score': security_score,
'compliance_assessment': compliance,
'vulnerability_analysis': vulnerabilities,
'recommendations': recommendations,
'overall_risk_level': self._determine_risk_level(security_score['overall_security_score'])
}
def _generate_security_recommendations(
self,
security_score: Dict[str, Any],
compliance: Dict[str, Dict[str, Any]],
vulnerabilities: Dict[str, Any]
) -> List[str]:
"""
Generate security recommendations.
Args:
security_score: Security score data
compliance: Compliance assessment
vulnerabilities: Vulnerability analysis
Returns:
List of recommendations
"""
recommendations = []
# Security score recommendations
if security_score['overall_security_score'] < 70:
recommendations.append("Improve overall security posture - score below acceptable threshold")
# Vulnerability recommendations
current_critical = vulnerabilities['current_vulnerabilities']['critical']
if current_critical > 0:
recommendations.append(f"Address {current_critical} critical vulnerabilities immediately")
# Patch responsiveness
if security_score['patch_responsiveness'] < 60:
recommendations.append("Improve vulnerability patch response time")
# Security features
if security_score['security_features_score'] < 70:
recommendations.append("Implement additional security features (MFA, audit logging, RBAC)")
# Compliance recommendations
for standard, assessment in compliance.items():
if assessment['readiness_level'] == "Not Ready":
recommendations.append(f"{standard}: {assessment['recommendation']}")
if not recommendations:
recommendations.append("Security posture is strong - continue monitoring and maintenance")
return recommendations
def _determine_risk_level(self, security_score: float) -> str:
"""
Determine overall risk level.
Args:
security_score: Overall security score
Returns:
Risk level description
"""
if security_score >= 85:
return "Low Risk - Strong security posture"
elif security_score >= 70:
return "Medium Risk - Acceptable with monitoring"
elif security_score >= 55:
return "High Risk - Security improvements needed"
else:
return "Critical Risk - Not recommended for production use"

View File

@@ -0,0 +1,389 @@
"""
Technology Stack Comparator - Main comparison engine with weighted scoring.
Provides comprehensive technology comparison with customizable weighted criteria,
feature matrices, and intelligent recommendation generation.
"""
from typing import Dict, List, Any, Optional, Tuple
import json
class StackComparator:
"""Main comparison engine for technology stack evaluation."""
# Feature categories for evaluation
FEATURE_CATEGORIES = [
"performance",
"scalability",
"developer_experience",
"ecosystem",
"learning_curve",
"documentation",
"community_support",
"enterprise_readiness"
]
# Default weights if not provided
DEFAULT_WEIGHTS = {
"performance": 15,
"scalability": 15,
"developer_experience": 20,
"ecosystem": 15,
"learning_curve": 10,
"documentation": 10,
"community_support": 10,
"enterprise_readiness": 5
}
def __init__(self, comparison_data: Dict[str, Any]):
"""
Initialize comparator with comparison data.
Args:
comparison_data: Dictionary containing technologies to compare and criteria
"""
self.technologies = comparison_data.get('technologies', [])
self.use_case = comparison_data.get('use_case', 'general')
self.priorities = comparison_data.get('priorities', {})
self.weights = self._normalize_weights(comparison_data.get('weights', {}))
self.scores = {}
def _normalize_weights(self, custom_weights: Dict[str, float]) -> Dict[str, float]:
"""
Normalize weights to sum to 100.
Args:
custom_weights: User-provided weights
Returns:
Normalized weights dictionary
"""
# Start with defaults
weights = self.DEFAULT_WEIGHTS.copy()
# Override with custom weights
weights.update(custom_weights)
# Normalize to 100
total = sum(weights.values())
if total == 0:
return self.DEFAULT_WEIGHTS
return {k: (v / total) * 100 for k, v in weights.items()}
def score_technology(self, tech_name: str, tech_data: Dict[str, Any]) -> Dict[str, float]:
"""
Score a single technology across all criteria.
Args:
tech_name: Name of technology
tech_data: Technology feature and metric data
Returns:
Dictionary of category scores (0-100 scale)
"""
scores = {}
for category in self.FEATURE_CATEGORIES:
# Get raw score from tech data (0-100 scale)
raw_score = tech_data.get(category, {}).get('score', 50.0)
# Apply use-case specific adjustments
adjusted_score = self._adjust_for_use_case(category, raw_score, tech_name)
scores[category] = min(100.0, max(0.0, adjusted_score))
return scores
def _adjust_for_use_case(self, category: str, score: float, tech_name: str) -> float:
"""
Apply use-case specific adjustments to scores.
Args:
category: Feature category
score: Raw score
tech_name: Technology name
Returns:
Adjusted score
"""
# Use case specific bonuses/penalties
adjustments = {
'real-time': {
'performance': 1.1, # 10% bonus for real-time use cases
'scalability': 1.1
},
'enterprise': {
'enterprise_readiness': 1.2, # 20% bonus
'documentation': 1.1
},
'startup': {
'developer_experience': 1.15,
'learning_curve': 1.1
}
}
# Determine use case type
use_case_lower = self.use_case.lower()
use_case_type = None
for uc_key in adjustments.keys():
if uc_key in use_case_lower:
use_case_type = uc_key
break
# Apply adjustment if applicable
if use_case_type and category in adjustments[use_case_type]:
multiplier = adjustments[use_case_type][category]
return score * multiplier
return score
def calculate_weighted_score(self, category_scores: Dict[str, float]) -> float:
"""
Calculate weighted total score.
Args:
category_scores: Dictionary of category scores
Returns:
Weighted total score (0-100 scale)
"""
total = 0.0
for category, score in category_scores.items():
weight = self.weights.get(category, 0.0) / 100.0 # Convert to decimal
total += score * weight
return total
def compare_technologies(self, tech_data_list: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Compare multiple technologies and generate recommendation.
Args:
tech_data_list: List of technology data dictionaries
Returns:
Comparison results with scores and recommendation
"""
results = {
'technologies': {},
'recommendation': None,
'confidence': 0.0,
'decision_factors': [],
'comparison_matrix': []
}
# Score each technology
tech_scores = {}
for tech_data in tech_data_list:
tech_name = tech_data.get('name', 'Unknown')
category_scores = self.score_technology(tech_name, tech_data)
weighted_score = self.calculate_weighted_score(category_scores)
tech_scores[tech_name] = {
'category_scores': category_scores,
'weighted_total': weighted_score,
'strengths': self._identify_strengths(category_scores),
'weaknesses': self._identify_weaknesses(category_scores)
}
results['technologies'] = tech_scores
# Generate recommendation
results['recommendation'], results['confidence'] = self._generate_recommendation(tech_scores)
results['decision_factors'] = self._extract_decision_factors(tech_scores)
results['comparison_matrix'] = self._build_comparison_matrix(tech_scores)
return results
def _identify_strengths(self, category_scores: Dict[str, float], threshold: float = 75.0) -> List[str]:
"""
Identify strength categories (scores above threshold).
Args:
category_scores: Category scores dictionary
threshold: Score threshold for strength identification
Returns:
List of strength categories
"""
return [
category for category, score in category_scores.items()
if score >= threshold
]
def _identify_weaknesses(self, category_scores: Dict[str, float], threshold: float = 50.0) -> List[str]:
"""
Identify weakness categories (scores below threshold).
Args:
category_scores: Category scores dictionary
threshold: Score threshold for weakness identification
Returns:
List of weakness categories
"""
return [
category for category, score in category_scores.items()
if score < threshold
]
def _generate_recommendation(self, tech_scores: Dict[str, Dict[str, Any]]) -> Tuple[str, float]:
"""
Generate recommendation and confidence level.
Args:
tech_scores: Technology scores dictionary
Returns:
Tuple of (recommended_technology, confidence_score)
"""
if not tech_scores:
return "Insufficient data", 0.0
# Sort by weighted total score
sorted_techs = sorted(
tech_scores.items(),
key=lambda x: x[1]['weighted_total'],
reverse=True
)
top_tech = sorted_techs[0][0]
top_score = sorted_techs[0][1]['weighted_total']
# Calculate confidence based on score gap
if len(sorted_techs) > 1:
second_score = sorted_techs[1][1]['weighted_total']
score_gap = top_score - second_score
# Confidence increases with score gap
# 0-5 gap: low confidence
# 5-15 gap: medium confidence
# 15+ gap: high confidence
if score_gap < 5:
confidence = 40.0 + (score_gap * 2) # 40-50%
elif score_gap < 15:
confidence = 50.0 + (score_gap - 5) * 2 # 50-70%
else:
confidence = 70.0 + min(score_gap - 15, 30) # 70-100%
else:
confidence = 100.0 # Only one option
return top_tech, min(100.0, confidence)
def _extract_decision_factors(self, tech_scores: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Extract key decision factors from comparison.
Args:
tech_scores: Technology scores dictionary
Returns:
List of decision factors with importance weights
"""
factors = []
# Get top weighted categories
sorted_weights = sorted(
self.weights.items(),
key=lambda x: x[1],
reverse=True
)[:3] # Top 3 factors
for category, weight in sorted_weights:
# Get scores for this category across all techs
category_scores = {
tech: scores['category_scores'].get(category, 0.0)
for tech, scores in tech_scores.items()
}
# Find best performer
best_tech = max(category_scores.items(), key=lambda x: x[1])
factors.append({
'category': category,
'importance': f"{weight:.1f}%",
'best_performer': best_tech[0],
'score': best_tech[1]
})
return factors
def _build_comparison_matrix(self, tech_scores: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Build comparison matrix for display.
Args:
tech_scores: Technology scores dictionary
Returns:
List of comparison matrix rows
"""
matrix = []
for category in self.FEATURE_CATEGORIES:
row = {
'category': category,
'weight': f"{self.weights.get(category, 0):.1f}%",
'scores': {}
}
for tech_name, scores in tech_scores.items():
category_score = scores['category_scores'].get(category, 0.0)
row['scores'][tech_name] = f"{category_score:.1f}"
matrix.append(row)
# Add weighted totals row
totals_row = {
'category': 'WEIGHTED TOTAL',
'weight': '100%',
'scores': {}
}
for tech_name, scores in tech_scores.items():
totals_row['scores'][tech_name] = f"{scores['weighted_total']:.1f}"
matrix.append(totals_row)
return matrix
def generate_pros_cons(self, tech_name: str, tech_scores: Dict[str, Any]) -> Dict[str, List[str]]:
"""
Generate pros and cons for a technology.
Args:
tech_name: Technology name
tech_scores: Technology scores dictionary
Returns:
Dictionary with 'pros' and 'cons' lists
"""
category_scores = tech_scores['category_scores']
strengths = tech_scores['strengths']
weaknesses = tech_scores['weaknesses']
pros = []
cons = []
# Generate pros from strengths
for strength in strengths[:3]: # Top 3
score = category_scores[strength]
pros.append(f"Excellent {strength.replace('_', ' ')} (score: {score:.1f}/100)")
# Generate cons from weaknesses
for weakness in weaknesses[:3]: # Top 3
score = category_scores[weakness]
cons.append(f"Weaker {weakness.replace('_', ' ')} (score: {score:.1f}/100)")
# Add generic pros/cons if not enough specific ones
if len(pros) == 0:
pros.append(f"Balanced performance across all categories")
if len(cons) == 0:
cons.append(f"No significant weaknesses identified")
return {'pros': pros, 'cons': cons}

View File

@@ -0,0 +1,458 @@
"""
Total Cost of Ownership (TCO) Calculator.
Calculates comprehensive TCO including licensing, hosting, developer productivity,
scaling costs, and hidden costs over multi-year projections.
"""
from typing import Dict, List, Any, Optional
import json
class TCOCalculator:
"""Calculate Total Cost of Ownership for technology stacks."""
def __init__(self, tco_data: Dict[str, Any]):
"""
Initialize TCO calculator with cost parameters.
Args:
tco_data: Dictionary containing cost parameters and projections
"""
self.technology = tco_data.get('technology', 'Unknown')
self.team_size = tco_data.get('team_size', 5)
self.timeline_years = tco_data.get('timeline_years', 5)
self.initial_costs = tco_data.get('initial_costs', {})
self.operational_costs = tco_data.get('operational_costs', {})
self.scaling_params = tco_data.get('scaling_params', {})
self.productivity_factors = tco_data.get('productivity_factors', {})
def calculate_initial_costs(self) -> Dict[str, float]:
"""
Calculate one-time initial costs.
Returns:
Dictionary of initial cost components
"""
costs = {
'licensing': self.initial_costs.get('licensing', 0.0),
'training': self._calculate_training_costs(),
'migration': self.initial_costs.get('migration', 0.0),
'setup': self.initial_costs.get('setup', 0.0),
'tooling': self.initial_costs.get('tooling', 0.0)
}
costs['total_initial'] = sum(costs.values())
return costs
def _calculate_training_costs(self) -> float:
"""
Calculate training costs based on team size and learning curve.
Returns:
Total training cost
"""
# Default training assumptions
hours_per_developer = self.initial_costs.get('training_hours_per_dev', 40)
avg_hourly_rate = self.initial_costs.get('developer_hourly_rate', 100)
training_materials = self.initial_costs.get('training_materials', 500)
total_hours = self.team_size * hours_per_developer
total_cost = (total_hours * avg_hourly_rate) + training_materials
return total_cost
def calculate_operational_costs(self) -> Dict[str, List[float]]:
"""
Calculate ongoing operational costs per year.
Returns:
Dictionary with yearly cost projections
"""
yearly_costs = {
'licensing': [],
'hosting': [],
'support': [],
'maintenance': [],
'total_yearly': []
}
for year in range(1, self.timeline_years + 1):
# Licensing costs (may include annual fees)
license_cost = self.operational_costs.get('annual_licensing', 0.0)
yearly_costs['licensing'].append(license_cost)
# Hosting costs (scale with growth)
hosting_cost = self._calculate_hosting_cost(year)
yearly_costs['hosting'].append(hosting_cost)
# Support costs
support_cost = self.operational_costs.get('annual_support', 0.0)
yearly_costs['support'].append(support_cost)
# Maintenance costs (developer time)
maintenance_cost = self._calculate_maintenance_cost(year)
yearly_costs['maintenance'].append(maintenance_cost)
# Total for year
year_total = (
license_cost + hosting_cost + support_cost + maintenance_cost
)
yearly_costs['total_yearly'].append(year_total)
return yearly_costs
def _calculate_hosting_cost(self, year: int) -> float:
"""
Calculate hosting costs with growth projection.
Args:
year: Year number (1-indexed)
Returns:
Hosting cost for the year
"""
base_cost = self.operational_costs.get('monthly_hosting', 1000.0) * 12
growth_rate = self.scaling_params.get('annual_growth_rate', 0.20) # 20% default
# Apply compound growth
year_cost = base_cost * ((1 + growth_rate) ** (year - 1))
return year_cost
def _calculate_maintenance_cost(self, year: int) -> float:
"""
Calculate maintenance costs (developer time).
Args:
year: Year number (1-indexed)
Returns:
Maintenance cost for the year
"""
hours_per_dev_per_month = self.operational_costs.get('maintenance_hours_per_dev_monthly', 20)
avg_hourly_rate = self.initial_costs.get('developer_hourly_rate', 100)
monthly_cost = self.team_size * hours_per_dev_per_month * avg_hourly_rate
yearly_cost = monthly_cost * 12
return yearly_cost
def calculate_scaling_costs(self) -> Dict[str, Any]:
"""
Calculate scaling-related costs and metrics.
Returns:
Dictionary with scaling cost analysis
"""
# Project user growth
initial_users = self.scaling_params.get('initial_users', 1000)
annual_growth_rate = self.scaling_params.get('annual_growth_rate', 0.20)
user_projections = []
for year in range(1, self.timeline_years + 1):
users = initial_users * ((1 + annual_growth_rate) ** year)
user_projections.append(int(users))
# Calculate cost per user
operational = self.calculate_operational_costs()
cost_per_user = []
for year_idx, year_cost in enumerate(operational['total_yearly']):
users = user_projections[year_idx]
cost_per_user.append(year_cost / users if users > 0 else 0)
# Infrastructure scaling costs
infra_scaling = self._calculate_infrastructure_scaling()
return {
'user_projections': user_projections,
'cost_per_user': cost_per_user,
'infrastructure_scaling': infra_scaling,
'scaling_efficiency': self._calculate_scaling_efficiency(cost_per_user)
}
def _calculate_infrastructure_scaling(self) -> Dict[str, List[float]]:
"""
Calculate infrastructure scaling costs.
Returns:
Infrastructure cost projections
"""
base_servers = self.scaling_params.get('initial_servers', 5)
cost_per_server_monthly = self.scaling_params.get('cost_per_server_monthly', 200)
growth_rate = self.scaling_params.get('annual_growth_rate', 0.20)
server_costs = []
for year in range(1, self.timeline_years + 1):
servers_needed = base_servers * ((1 + growth_rate) ** year)
yearly_cost = servers_needed * cost_per_server_monthly * 12
server_costs.append(yearly_cost)
return {
'yearly_infrastructure_costs': server_costs
}
def _calculate_scaling_efficiency(self, cost_per_user: List[float]) -> str:
"""
Assess scaling efficiency based on cost per user trend.
Args:
cost_per_user: List of yearly cost per user
Returns:
Efficiency assessment
"""
if len(cost_per_user) < 2:
return "Insufficient data"
# Compare first year to last year
initial = cost_per_user[0]
final = cost_per_user[-1]
if final < initial * 0.8:
return "Excellent - economies of scale achieved"
elif final < initial:
return "Good - improving efficiency over time"
elif final < initial * 1.2:
return "Moderate - costs growing with users"
else:
return "Poor - costs growing faster than users"
def calculate_productivity_impact(self) -> Dict[str, Any]:
"""
Calculate developer productivity impact.
Returns:
Productivity analysis
"""
# Productivity multiplier (1.0 = baseline)
productivity_multiplier = self.productivity_factors.get('productivity_multiplier', 1.0)
# Time to market impact (in days)
ttm_reduction = self.productivity_factors.get('time_to_market_reduction_days', 0)
# Calculate value of faster development
avg_feature_time_days = self.productivity_factors.get('avg_feature_time_days', 30)
features_per_year = 365 / avg_feature_time_days
faster_features_per_year = 365 / max(1, avg_feature_time_days - ttm_reduction)
additional_features = faster_features_per_year - features_per_year
feature_value = self.productivity_factors.get('avg_feature_value', 10000)
yearly_productivity_value = additional_features * feature_value
return {
'productivity_multiplier': productivity_multiplier,
'time_to_market_reduction_days': ttm_reduction,
'additional_features_per_year': additional_features,
'yearly_productivity_value': yearly_productivity_value,
'five_year_productivity_value': yearly_productivity_value * self.timeline_years
}
def calculate_hidden_costs(self) -> Dict[str, float]:
"""
Identify and calculate hidden costs.
Returns:
Dictionary of hidden cost components
"""
costs = {
'technical_debt': self._estimate_technical_debt(),
'vendor_lock_in_risk': self._estimate_vendor_lock_in_cost(),
'security_incidents': self._estimate_security_costs(),
'downtime_risk': self._estimate_downtime_costs(),
'developer_turnover': self._estimate_turnover_costs()
}
costs['total_hidden_costs'] = sum(costs.values())
return costs
def _estimate_technical_debt(self) -> float:
"""
Estimate technical debt accumulation costs.
Returns:
Estimated technical debt cost
"""
# Percentage of development time spent on debt
debt_percentage = self.productivity_factors.get('technical_debt_percentage', 0.15)
yearly_dev_cost = self._calculate_maintenance_cost(1) # Year 1 baseline
# Technical debt accumulates over time
total_debt_cost = 0
for year in range(1, self.timeline_years + 1):
year_debt = yearly_dev_cost * debt_percentage * year # Increases each year
total_debt_cost += year_debt
return total_debt_cost
def _estimate_vendor_lock_in_cost(self) -> float:
"""
Estimate cost of vendor lock-in.
Returns:
Estimated lock-in cost
"""
lock_in_risk = self.productivity_factors.get('vendor_lock_in_risk', 'low')
# Migration cost if switching vendors
migration_cost = self.initial_costs.get('migration', 10000)
risk_multipliers = {
'low': 0.1,
'medium': 0.3,
'high': 0.6
}
multiplier = risk_multipliers.get(lock_in_risk, 0.2)
return migration_cost * multiplier
def _estimate_security_costs(self) -> float:
"""
Estimate potential security incident costs.
Returns:
Estimated security cost
"""
incidents_per_year = self.productivity_factors.get('security_incidents_per_year', 0.5)
avg_incident_cost = self.productivity_factors.get('avg_security_incident_cost', 50000)
total_cost = incidents_per_year * avg_incident_cost * self.timeline_years
return total_cost
def _estimate_downtime_costs(self) -> float:
"""
Estimate downtime costs.
Returns:
Estimated downtime cost
"""
hours_downtime_per_year = self.productivity_factors.get('downtime_hours_per_year', 2)
cost_per_hour = self.productivity_factors.get('downtime_cost_per_hour', 5000)
total_cost = hours_downtime_per_year * cost_per_hour * self.timeline_years
return total_cost
def _estimate_turnover_costs(self) -> float:
"""
Estimate costs from developer turnover.
Returns:
Estimated turnover cost
"""
turnover_rate = self.productivity_factors.get('annual_turnover_rate', 0.15)
cost_per_hire = self.productivity_factors.get('cost_per_new_hire', 30000)
hires_per_year = self.team_size * turnover_rate
total_cost = hires_per_year * cost_per_hire * self.timeline_years
return total_cost
def calculate_total_tco(self) -> Dict[str, Any]:
"""
Calculate complete TCO over the timeline.
Returns:
Comprehensive TCO analysis
"""
initial = self.calculate_initial_costs()
operational = self.calculate_operational_costs()
scaling = self.calculate_scaling_costs()
productivity = self.calculate_productivity_impact()
hidden = self.calculate_hidden_costs()
# Calculate total costs
total_operational = sum(operational['total_yearly'])
total_cost = initial['total_initial'] + total_operational + hidden['total_hidden_costs']
# Adjust for productivity gains
net_cost = total_cost - productivity['five_year_productivity_value']
return {
'technology': self.technology,
'timeline_years': self.timeline_years,
'initial_costs': initial,
'operational_costs': operational,
'scaling_analysis': scaling,
'productivity_impact': productivity,
'hidden_costs': hidden,
'total_tco': total_cost,
'net_tco_after_productivity': net_cost,
'average_yearly_cost': total_cost / self.timeline_years
}
def generate_tco_summary(self) -> Dict[str, Any]:
"""
Generate executive summary of TCO.
Returns:
TCO summary for reporting
"""
tco = self.calculate_total_tco()
return {
'technology': self.technology,
'total_tco': f"${tco['total_tco']:,.2f}",
'net_tco': f"${tco['net_tco_after_productivity']:,.2f}",
'average_yearly': f"${tco['average_yearly_cost']:,.2f}",
'initial_investment': f"${tco['initial_costs']['total_initial']:,.2f}",
'key_cost_drivers': self._identify_cost_drivers(tco),
'cost_optimization_opportunities': self._identify_optimizations(tco)
}
def _identify_cost_drivers(self, tco: Dict[str, Any]) -> List[str]:
"""
Identify top cost drivers.
Args:
tco: Complete TCO analysis
Returns:
List of top cost drivers
"""
drivers = []
# Check operational costs
operational = tco['operational_costs']
total_hosting = sum(operational['hosting'])
total_maintenance = sum(operational['maintenance'])
if total_hosting > total_maintenance:
drivers.append(f"Infrastructure/hosting ({total_hosting:,.0f})")
else:
drivers.append(f"Developer maintenance time ({total_maintenance:,.0f})")
# Check hidden costs
hidden = tco['hidden_costs']
if hidden['technical_debt'] > 10000:
drivers.append(f"Technical debt ({hidden['technical_debt']:,.0f})")
return drivers[:3] # Top 3
def _identify_optimizations(self, tco: Dict[str, Any]) -> List[str]:
"""
Identify cost optimization opportunities.
Args:
tco: Complete TCO analysis
Returns:
List of optimization suggestions
"""
optimizations = []
# Check scaling efficiency
scaling = tco['scaling_analysis']
if scaling['scaling_efficiency'].startswith('Poor'):
optimizations.append("Improve scaling efficiency - costs growing too fast")
# Check hidden costs
hidden = tco['hidden_costs']
if hidden['technical_debt'] > 20000:
optimizations.append("Address technical debt accumulation")
if hidden['downtime_risk'] > 10000:
optimizations.append("Invest in reliability to reduce downtime costs")
return optimizations