Complete restructure based on AI Agent Skills Benchmark feedback (original score: 66/100):
## Directory Reorganization
- Moved Python scripts to scripts/ directory
- Moved sample files to assets/ directory
- Created references/ directory with extracted content
- Removed HOW_TO_USE.md (integrated into SKILL.md)
- Removed __pycache__
## New Reference Files (3 files)
- architecture_patterns.md: 6 AWS patterns (serverless, microservices, three-tier,
data processing, GraphQL, multi-region) with diagrams, cost breakdowns, pros/cons
- service_selection.md: Decision matrices for compute, database, storage, messaging,
networking, security services with code examples
- best_practices.md: Serverless design, cost optimization, security hardening,
scalability patterns, common pitfalls
## SKILL.md Rewrite
- Reduced from 345 lines to 307 lines (moved patterns to references/)
- Added trigger phrases to description ("design serverless architecture",
"create CloudFormation templates", "optimize AWS costs")
- Structured around 6-step workflow instead of encyclopedia format
- Added Quick Start examples (MVP, Scaling, Cost Optimization, IaC)
- Removed marketing language ("Expert", "comprehensive")
- Consistent imperative voice throughout
## Structure Changes
- scripts/: architecture_designer.py, cost_optimizer.py, serverless_stack.py
- references/: architecture_patterns.md, service_selection.md, best_practices.md
- assets/: sample_input.json, expected_output.json
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
347 lines
15 KiB
Python
347 lines
15 KiB
Python
"""
|
|
AWS cost optimization analyzer.
|
|
Provides cost-saving recommendations for startup budgets.
|
|
"""
|
|
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
|
|
class CostOptimizer:
|
|
"""Analyze AWS costs and provide optimization recommendations."""
|
|
|
|
def __init__(self, current_resources: Dict[str, Any], monthly_spend: float):
|
|
"""
|
|
Initialize with current AWS resources and spending.
|
|
|
|
Args:
|
|
current_resources: Dictionary of current AWS resources
|
|
monthly_spend: Current monthly AWS spend in USD
|
|
"""
|
|
self.resources = current_resources
|
|
self.monthly_spend = monthly_spend
|
|
self.recommendations = []
|
|
|
|
def analyze_and_optimize(self) -> Dict[str, Any]:
|
|
"""
|
|
Analyze current setup and generate cost optimization recommendations.
|
|
|
|
Returns:
|
|
Dictionary with recommendations and potential savings
|
|
"""
|
|
self.recommendations = []
|
|
potential_savings = 0.0
|
|
|
|
# Analyze compute resources
|
|
compute_savings = self._analyze_compute()
|
|
potential_savings += compute_savings
|
|
|
|
# Analyze storage
|
|
storage_savings = self._analyze_storage()
|
|
potential_savings += storage_savings
|
|
|
|
# Analyze database
|
|
database_savings = self._analyze_database()
|
|
potential_savings += database_savings
|
|
|
|
# Analyze networking
|
|
network_savings = self._analyze_networking()
|
|
potential_savings += network_savings
|
|
|
|
# General AWS optimizations
|
|
general_savings = self._analyze_general_optimizations()
|
|
potential_savings += general_savings
|
|
|
|
return {
|
|
'current_monthly_spend': self.monthly_spend,
|
|
'potential_monthly_savings': round(potential_savings, 2),
|
|
'optimized_monthly_spend': round(self.monthly_spend - potential_savings, 2),
|
|
'savings_percentage': round((potential_savings / self.monthly_spend) * 100, 2) if self.monthly_spend > 0 else 0,
|
|
'recommendations': self.recommendations,
|
|
'priority_actions': self._prioritize_recommendations()
|
|
}
|
|
|
|
def _analyze_compute(self) -> float:
|
|
"""Analyze compute resources (EC2, Lambda, Fargate)."""
|
|
savings = 0.0
|
|
|
|
ec2_instances = self.resources.get('ec2_instances', [])
|
|
if ec2_instances:
|
|
# Check for idle instances
|
|
idle_count = sum(1 for inst in ec2_instances if inst.get('cpu_utilization', 100) < 10)
|
|
if idle_count > 0:
|
|
idle_cost = idle_count * 50 # Assume $50/month per idle instance
|
|
savings += idle_cost
|
|
self.recommendations.append({
|
|
'service': 'EC2',
|
|
'type': 'Idle Resources',
|
|
'issue': f'{idle_count} EC2 instances with <10% CPU utilization',
|
|
'recommendation': 'Stop or terminate idle instances, or downsize to smaller instance types',
|
|
'potential_savings': idle_cost,
|
|
'priority': 'high'
|
|
})
|
|
|
|
# Check for Savings Plans / Reserved Instances
|
|
on_demand_count = sum(1 for inst in ec2_instances if inst.get('pricing', 'on-demand') == 'on-demand')
|
|
if on_demand_count >= 2:
|
|
ri_savings = on_demand_count * 50 * 0.30 # 30% savings with RIs
|
|
savings += ri_savings
|
|
self.recommendations.append({
|
|
'service': 'EC2',
|
|
'type': 'Pricing Optimization',
|
|
'issue': f'{on_demand_count} instances on On-Demand pricing',
|
|
'recommendation': 'Purchase Compute Savings Plan or Reserved Instances for predictable workloads (1-year commitment)',
|
|
'potential_savings': ri_savings,
|
|
'priority': 'medium'
|
|
})
|
|
|
|
# Lambda optimization
|
|
lambda_functions = self.resources.get('lambda_functions', [])
|
|
if lambda_functions:
|
|
oversized = sum(1 for fn in lambda_functions if fn.get('memory_mb', 128) > 512 and fn.get('avg_memory_used_mb', 0) < 256)
|
|
if oversized > 0:
|
|
lambda_savings = oversized * 5 # Assume $5/month per oversized function
|
|
savings += lambda_savings
|
|
self.recommendations.append({
|
|
'service': 'Lambda',
|
|
'type': 'Right-sizing',
|
|
'issue': f'{oversized} Lambda functions over-provisioned (memory too high)',
|
|
'recommendation': 'Use AWS Lambda Power Tuning tool to optimize memory settings',
|
|
'potential_savings': lambda_savings,
|
|
'priority': 'low'
|
|
})
|
|
|
|
return savings
|
|
|
|
def _analyze_storage(self) -> float:
|
|
"""Analyze S3 and other storage resources."""
|
|
savings = 0.0
|
|
|
|
s3_buckets = self.resources.get('s3_buckets', [])
|
|
for bucket in s3_buckets:
|
|
size_gb = bucket.get('size_gb', 0)
|
|
storage_class = bucket.get('storage_class', 'STANDARD')
|
|
|
|
# Check for lifecycle policies
|
|
if not bucket.get('has_lifecycle_policy', False) and size_gb > 100:
|
|
lifecycle_savings = size_gb * 0.015 # $0.015/GB savings with IA transition
|
|
savings += lifecycle_savings
|
|
self.recommendations.append({
|
|
'service': 'S3',
|
|
'type': 'Lifecycle Policy',
|
|
'issue': f'Bucket {bucket.get("name", "unknown")} ({size_gb} GB) has no lifecycle policy',
|
|
'recommendation': 'Implement lifecycle policy: Transition to IA after 30 days, Glacier after 90 days',
|
|
'potential_savings': lifecycle_savings,
|
|
'priority': 'medium'
|
|
})
|
|
|
|
# Check for Intelligent-Tiering
|
|
if storage_class == 'STANDARD' and size_gb > 500:
|
|
tiering_savings = size_gb * 0.005
|
|
savings += tiering_savings
|
|
self.recommendations.append({
|
|
'service': 'S3',
|
|
'type': 'Storage Class',
|
|
'issue': f'Large bucket ({size_gb} GB) using STANDARD storage',
|
|
'recommendation': 'Enable S3 Intelligent-Tiering for automatic cost optimization',
|
|
'potential_savings': tiering_savings,
|
|
'priority': 'high'
|
|
})
|
|
|
|
return savings
|
|
|
|
def _analyze_database(self) -> float:
|
|
"""Analyze RDS, DynamoDB, and other database costs."""
|
|
savings = 0.0
|
|
|
|
rds_instances = self.resources.get('rds_instances', [])
|
|
for db in rds_instances:
|
|
# Check for idle databases
|
|
if db.get('connections_per_day', 1000) < 10:
|
|
db_cost = db.get('monthly_cost', 100)
|
|
savings += db_cost * 0.8 # Can save 80% by stopping
|
|
self.recommendations.append({
|
|
'service': 'RDS',
|
|
'type': 'Idle Resource',
|
|
'issue': f'Database {db.get("name", "unknown")} has <10 connections/day',
|
|
'recommendation': 'Stop database if not needed, or take final snapshot and delete',
|
|
'potential_savings': db_cost * 0.8,
|
|
'priority': 'high'
|
|
})
|
|
|
|
# Check for Aurora Serverless opportunity
|
|
if db.get('engine', '').startswith('aurora') and db.get('utilization', 100) < 30:
|
|
serverless_savings = db.get('monthly_cost', 200) * 0.40
|
|
savings += serverless_savings
|
|
self.recommendations.append({
|
|
'service': 'RDS Aurora',
|
|
'type': 'Serverless Migration',
|
|
'issue': f'Aurora instance {db.get("name", "unknown")} has low utilization (<30%)',
|
|
'recommendation': 'Migrate to Aurora Serverless v2 for auto-scaling and pay-per-use',
|
|
'potential_savings': serverless_savings,
|
|
'priority': 'medium'
|
|
})
|
|
|
|
# DynamoDB optimization
|
|
dynamodb_tables = self.resources.get('dynamodb_tables', [])
|
|
for table in dynamodb_tables:
|
|
if table.get('billing_mode', 'PROVISIONED') == 'PROVISIONED':
|
|
read_capacity = table.get('read_capacity_units', 0)
|
|
write_capacity = table.get('write_capacity_units', 0)
|
|
utilization = table.get('utilization_percentage', 100)
|
|
|
|
if utilization < 20:
|
|
on_demand_savings = (read_capacity * 0.00013 + write_capacity * 0.00065) * 730 * 0.3
|
|
savings += on_demand_savings
|
|
self.recommendations.append({
|
|
'service': 'DynamoDB',
|
|
'type': 'Billing Mode',
|
|
'issue': f'Table {table.get("name", "unknown")} has low utilization with provisioned capacity',
|
|
'recommendation': 'Switch to On-Demand billing mode for variable workloads',
|
|
'potential_savings': on_demand_savings,
|
|
'priority': 'medium'
|
|
})
|
|
|
|
return savings
|
|
|
|
def _analyze_networking(self) -> float:
|
|
"""Analyze networking costs (data transfer, NAT Gateway, etc.)."""
|
|
savings = 0.0
|
|
|
|
nat_gateways = self.resources.get('nat_gateways', [])
|
|
if len(nat_gateways) > 1:
|
|
multi_az = self.resources.get('multi_az_required', False)
|
|
if not multi_az:
|
|
nat_savings = (len(nat_gateways) - 1) * 45 # $45/month per NAT Gateway
|
|
savings += nat_savings
|
|
self.recommendations.append({
|
|
'service': 'NAT Gateway',
|
|
'type': 'Resource Consolidation',
|
|
'issue': f'{len(nat_gateways)} NAT Gateways deployed (multi-AZ not required)',
|
|
'recommendation': 'Use single NAT Gateway in dev/staging, or consider VPC endpoints for AWS services',
|
|
'potential_savings': nat_savings,
|
|
'priority': 'high'
|
|
})
|
|
|
|
# Check for VPC endpoints opportunity
|
|
if not self.resources.get('vpc_endpoints', []):
|
|
s3_data_transfer = self.resources.get('s3_data_transfer_gb', 0)
|
|
if s3_data_transfer > 100:
|
|
endpoint_savings = s3_data_transfer * 0.09 * 0.5 # Save 50% of data transfer costs
|
|
savings += endpoint_savings
|
|
self.recommendations.append({
|
|
'service': 'VPC',
|
|
'type': 'VPC Endpoints',
|
|
'issue': 'High S3 data transfer without VPC endpoints',
|
|
'recommendation': 'Create VPC endpoints for S3 and DynamoDB to avoid NAT Gateway costs',
|
|
'potential_savings': endpoint_savings,
|
|
'priority': 'medium'
|
|
})
|
|
|
|
return savings
|
|
|
|
def _analyze_general_optimizations(self) -> float:
|
|
"""General AWS cost optimizations."""
|
|
savings = 0.0
|
|
|
|
# Check for CloudWatch Logs retention
|
|
log_groups = self.resources.get('cloudwatch_log_groups', [])
|
|
for log in log_groups:
|
|
if log.get('retention_days', 1) == -1: # Never expire
|
|
log_size_gb = log.get('size_gb', 1)
|
|
retention_savings = log_size_gb * 0.50 * 0.7 # 70% savings with 7-day retention
|
|
savings += retention_savings
|
|
self.recommendations.append({
|
|
'service': 'CloudWatch Logs',
|
|
'type': 'Retention Policy',
|
|
'issue': f'Log group {log.get("name", "unknown")} has infinite retention',
|
|
'recommendation': 'Set retention to 7 days for non-compliance logs, 30 days for production',
|
|
'potential_savings': retention_savings,
|
|
'priority': 'low'
|
|
})
|
|
|
|
# Check for unused Elastic IPs
|
|
elastic_ips = self.resources.get('elastic_ips', [])
|
|
unattached = sum(1 for eip in elastic_ips if not eip.get('attached', True))
|
|
if unattached > 0:
|
|
eip_savings = unattached * 3.65 # $0.005/hour = $3.65/month
|
|
savings += eip_savings
|
|
self.recommendations.append({
|
|
'service': 'EC2',
|
|
'type': 'Unused Resources',
|
|
'issue': f'{unattached} unattached Elastic IPs',
|
|
'recommendation': 'Release unused Elastic IPs to avoid hourly charges',
|
|
'potential_savings': eip_savings,
|
|
'priority': 'high'
|
|
})
|
|
|
|
# Budget alerts
|
|
if not self.resources.get('has_budget_alerts', False):
|
|
self.recommendations.append({
|
|
'service': 'AWS Budgets',
|
|
'type': 'Cost Monitoring',
|
|
'issue': 'No budget alerts configured',
|
|
'recommendation': 'Set up AWS Budgets with alerts at 50%, 80%, 100% of monthly budget',
|
|
'potential_savings': 0,
|
|
'priority': 'high'
|
|
})
|
|
|
|
# Cost Explorer recommendations
|
|
if not self.resources.get('has_cost_explorer', False):
|
|
self.recommendations.append({
|
|
'service': 'Cost Management',
|
|
'type': 'Visibility',
|
|
'issue': 'Cost Explorer not enabled',
|
|
'recommendation': 'Enable AWS Cost Explorer to track spending patterns and identify anomalies',
|
|
'potential_savings': 0,
|
|
'priority': 'medium'
|
|
})
|
|
|
|
return savings
|
|
|
|
def _prioritize_recommendations(self) -> List[Dict[str, Any]]:
|
|
"""Get top priority recommendations."""
|
|
high_priority = [r for r in self.recommendations if r['priority'] == 'high']
|
|
high_priority.sort(key=lambda x: x.get('potential_savings', 0), reverse=True)
|
|
return high_priority[:5] # Top 5 high-priority recommendations
|
|
|
|
def generate_optimization_checklist(self) -> List[Dict[str, Any]]:
|
|
"""Generate actionable checklist for cost optimization."""
|
|
return [
|
|
{
|
|
'category': 'Immediate Actions (Today)',
|
|
'items': [
|
|
'Release unattached Elastic IPs',
|
|
'Stop idle EC2 instances',
|
|
'Delete unused EBS volumes',
|
|
'Set up budget alerts'
|
|
]
|
|
},
|
|
{
|
|
'category': 'This Week',
|
|
'items': [
|
|
'Implement S3 lifecycle policies',
|
|
'Consolidate NAT Gateways in non-prod',
|
|
'Set CloudWatch Logs retention to 7 days',
|
|
'Review and rightsize EC2/RDS instances'
|
|
]
|
|
},
|
|
{
|
|
'category': 'This Month',
|
|
'items': [
|
|
'Evaluate Savings Plans or Reserved Instances',
|
|
'Migrate to Aurora Serverless where applicable',
|
|
'Implement VPC endpoints for S3/DynamoDB',
|
|
'Switch DynamoDB tables to On-Demand if variable load'
|
|
]
|
|
},
|
|
{
|
|
'category': 'Ongoing',
|
|
'items': [
|
|
'Review Cost Explorer weekly',
|
|
'Tag all resources for cost allocation',
|
|
'Monitor Trusted Advisor recommendations',
|
|
'Conduct monthly cost review meetings'
|
|
]
|
|
}
|
|
]
|