fix(skill): restructure aws-solution-architect for better organization (#61) (#114)

Complete restructure based on AI Agent Skills Benchmark feedback (original score: 66/100):

## Directory Reorganization
- Moved Python scripts to scripts/ directory
- Moved sample files to assets/ directory
- Created references/ directory with extracted content
- Removed HOW_TO_USE.md (integrated into SKILL.md)
- Removed __pycache__

## New Reference Files (3 files)
- architecture_patterns.md: 6 AWS patterns (serverless, microservices, three-tier,
  data processing, GraphQL, multi-region) with diagrams, cost breakdowns, pros/cons
- service_selection.md: Decision matrices for compute, database, storage, messaging,
  networking, security services with code examples
- best_practices.md: Serverless design, cost optimization, security hardening,
  scalability patterns, common pitfalls

## SKILL.md Rewrite
- Reduced from 345 lines to 307 lines (moved patterns to references/)
- Added trigger phrases to description ("design serverless architecture",
  "create CloudFormation templates", "optimize AWS costs")
- Structured around 6-step workflow instead of encyclopedia format
- Added Quick Start examples (MVP, Scaling, Cost Optimization, IaC)
- Removed marketing language ("Expert", "comprehensive")
- Consistent imperative voice throughout

## Structure Changes
- scripts/: architecture_designer.py, cost_optimizer.py, serverless_stack.py
- references/: architecture_patterns.md, service_selection.md, best_practices.md
- assets/: sample_input.json, expected_output.json

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alireza Rezvani
2026-01-30 02:42:08 +01:00
committed by GitHub
parent c0989817bc
commit c7dc957823
13 changed files with 1930 additions and 626 deletions

View File

@@ -0,0 +1,808 @@
"""
AWS architecture design and service recommendation module.
Generates architecture patterns based on application requirements.
"""
from typing import Dict, List, Any, Optional
from enum import Enum
class ApplicationType(Enum):
"""Types of applications supported."""
WEB_APP = "web_application"
MOBILE_BACKEND = "mobile_backend"
DATA_PIPELINE = "data_pipeline"
MICROSERVICES = "microservices"
SAAS_PLATFORM = "saas_platform"
IOT_PLATFORM = "iot_platform"
class ArchitectureDesigner:
"""Design AWS architectures based on requirements."""
def __init__(self, requirements: Dict[str, Any]):
"""
Initialize with application requirements.
Args:
requirements: Dictionary containing app type, traffic, budget, etc.
"""
self.app_type = requirements.get('application_type', 'web_application')
self.expected_users = requirements.get('expected_users', 1000)
self.requests_per_second = requirements.get('requests_per_second', 10)
self.budget_monthly = requirements.get('budget_monthly_usd', 500)
self.team_size = requirements.get('team_size', 3)
self.aws_experience = requirements.get('aws_experience', 'beginner')
self.compliance_needs = requirements.get('compliance', [])
self.data_size_gb = requirements.get('data_size_gb', 10)
def recommend_architecture_pattern(self) -> Dict[str, Any]:
"""
Recommend architecture pattern based on requirements.
Returns:
Dictionary with recommended pattern and services
"""
# Determine pattern based on app type and scale
if self.app_type in ['web_application', 'saas_platform']:
if self.expected_users < 10000:
return self._serverless_web_architecture()
elif self.expected_users < 100000:
return self._modern_three_tier_architecture()
else:
return self._multi_region_architecture()
elif self.app_type == 'mobile_backend':
return self._serverless_mobile_backend()
elif self.app_type == 'data_pipeline':
return self._event_driven_data_pipeline()
elif self.app_type == 'microservices':
return self._event_driven_microservices()
elif self.app_type == 'iot_platform':
return self._iot_architecture()
else:
return self._serverless_web_architecture() # Default
def _serverless_web_architecture(self) -> Dict[str, Any]:
"""Serverless web application pattern."""
return {
'pattern_name': 'Serverless Web Application',
'description': 'Fully serverless architecture with zero server management',
'use_case': 'SaaS platforms, low to medium traffic websites, MVPs',
'services': {
'frontend': {
'service': 'S3 + CloudFront',
'purpose': 'Static website hosting with global CDN',
'configuration': {
's3_bucket': 'website-bucket',
'cloudfront_distribution': 'HTTPS with custom domain',
'caching': 'Cache-Control headers, edge caching'
}
},
'api': {
'service': 'API Gateway + Lambda',
'purpose': 'REST API backend with auto-scaling',
'configuration': {
'api_type': 'REST API',
'authorization': 'Cognito User Pools or API Keys',
'throttling': f'{self.requests_per_second * 10} requests/second',
'lambda_memory': '512 MB (optimize based on testing)',
'lambda_timeout': '10 seconds'
}
},
'database': {
'service': 'DynamoDB',
'purpose': 'NoSQL database with pay-per-request pricing',
'configuration': {
'billing_mode': 'PAY_PER_REQUEST',
'backup': 'Point-in-time recovery enabled',
'encryption': 'KMS encryption at rest'
}
},
'authentication': {
'service': 'Cognito',
'purpose': 'User authentication and authorization',
'configuration': {
'user_pools': 'Email/password + social providers',
'mfa': 'Optional MFA with SMS or TOTP',
'token_expiration': '1 hour access, 30 days refresh'
}
},
'cicd': {
'service': 'AWS Amplify or CodePipeline',
'purpose': 'Automated deployment from Git',
'configuration': {
'source': 'GitHub or CodeCommit',
'build': 'Automatic on commit',
'environments': 'dev, staging, production'
}
}
},
'estimated_cost': {
'monthly_usd': self._calculate_serverless_cost(),
'breakdown': {
'CloudFront': '10-30 USD',
'Lambda': '5-20 USD',
'API Gateway': '10-40 USD',
'DynamoDB': '5-30 USD',
'Cognito': '0-10 USD (free tier: 50k MAU)',
'S3': '1-5 USD'
}
},
'pros': [
'No server management',
'Auto-scaling built-in',
'Pay only for what you use',
'Fast to deploy and iterate',
'High availability by default'
],
'cons': [
'Cold start latency (100-500ms)',
'Vendor lock-in to AWS',
'Debugging distributed systems complex',
'Learning curve for serverless patterns'
],
'scaling_characteristics': {
'users_supported': '1k - 100k',
'requests_per_second': '100 - 10,000',
'scaling_method': 'Automatic (Lambda concurrency)'
}
}
def _modern_three_tier_architecture(self) -> Dict[str, Any]:
"""Traditional three-tier with modern AWS services."""
return {
'pattern_name': 'Modern Three-Tier Application',
'description': 'Classic architecture with containers and managed services',
'use_case': 'Traditional web apps, e-commerce, content management',
'services': {
'load_balancer': {
'service': 'Application Load Balancer (ALB)',
'purpose': 'Distribute traffic across instances',
'configuration': {
'scheme': 'internet-facing',
'target_type': 'ECS tasks or EC2 instances',
'health_checks': '/health endpoint, 30s interval',
'ssl': 'ACM certificate for HTTPS'
}
},
'compute': {
'service': 'ECS Fargate or EC2 Auto Scaling',
'purpose': 'Run containerized applications',
'configuration': {
'container_platform': 'ECS Fargate (serverless containers)',
'task_definition': '512 MB memory, 0.25 vCPU (start small)',
'auto_scaling': f'2-{max(4, self.expected_users // 5000)} tasks',
'deployment': 'Rolling update, 50% at a time'
}
},
'database': {
'service': 'RDS Aurora (MySQL/PostgreSQL)',
'purpose': 'Managed relational database',
'configuration': {
'instance_class': 'db.t3.medium or db.t4g.medium',
'multi_az': 'Yes (high availability)',
'read_replicas': '1-2 for read scaling',
'backup_retention': '7 days',
'encryption': 'KMS encryption enabled'
}
},
'cache': {
'service': 'ElastiCache Redis',
'purpose': 'Session storage, application caching',
'configuration': {
'node_type': 'cache.t3.micro or cache.t4g.micro',
'replication': 'Multi-AZ with automatic failover',
'eviction_policy': 'allkeys-lru'
}
},
'cdn': {
'service': 'CloudFront',
'purpose': 'Cache static assets globally',
'configuration': {
'origins': 'ALB (dynamic), S3 (static)',
'caching': 'Cache based on headers/cookies',
'compression': 'Gzip compression enabled'
}
},
'storage': {
'service': 'S3',
'purpose': 'User uploads, backups, logs',
'configuration': {
'storage_class': 'S3 Standard with lifecycle policies',
'versioning': 'Enabled for important buckets',
'lifecycle': 'Transition to IA after 30 days'
}
}
},
'estimated_cost': {
'monthly_usd': self._calculate_three_tier_cost(),
'breakdown': {
'ALB': '20-30 USD',
'ECS Fargate': '50-200 USD',
'RDS Aurora': '100-300 USD',
'ElastiCache': '30-80 USD',
'CloudFront': '10-50 USD',
'S3': '10-30 USD'
}
},
'pros': [
'Proven architecture pattern',
'Easy to understand and debug',
'Flexible scaling options',
'Support for complex applications',
'Managed services reduce operational burden'
],
'cons': [
'Higher baseline costs',
'More complex than serverless',
'Requires more operational knowledge',
'Manual scaling configuration needed'
],
'scaling_characteristics': {
'users_supported': '10k - 500k',
'requests_per_second': '1,000 - 50,000',
'scaling_method': 'Auto Scaling based on CPU/memory/requests'
}
}
def _serverless_mobile_backend(self) -> Dict[str, Any]:
"""Serverless mobile backend with GraphQL."""
return {
'pattern_name': 'Serverless Mobile Backend',
'description': 'Mobile-first backend with GraphQL and real-time features',
'use_case': 'Mobile apps, single-page apps, offline-first applications',
'services': {
'api': {
'service': 'AppSync (GraphQL)',
'purpose': 'Flexible GraphQL API with real-time subscriptions',
'configuration': {
'api_type': 'GraphQL',
'authorization': 'Cognito User Pools + API Keys',
'resolvers': 'Direct DynamoDB or Lambda',
'subscriptions': 'WebSocket for real-time updates',
'caching': 'Server-side caching (1 hour TTL)'
}
},
'database': {
'service': 'DynamoDB',
'purpose': 'Fast NoSQL database with global tables',
'configuration': {
'billing_mode': 'PAY_PER_REQUEST (on-demand)',
'global_tables': 'Multi-region if needed',
'streams': 'Enabled for change data capture',
'ttl': 'Automatic expiration for temporary data'
}
},
'file_storage': {
'service': 'S3 + CloudFront',
'purpose': 'User uploads (images, videos, documents)',
'configuration': {
'access': 'Signed URLs or Cognito credentials',
'lifecycle': 'Intelligent-Tiering for cost optimization',
'cdn': 'CloudFront for fast global delivery'
}
},
'authentication': {
'service': 'Cognito',
'purpose': 'User management and federation',
'configuration': {
'identity_providers': 'Email, Google, Apple, Facebook',
'mfa': 'SMS or TOTP',
'groups': 'Admin, premium, free tiers',
'custom_attributes': 'User metadata storage'
}
},
'push_notifications': {
'service': 'SNS Mobile Push',
'purpose': 'Push notifications to mobile devices',
'configuration': {
'platforms': 'iOS (APNs), Android (FCM)',
'topics': 'Group notifications by topic',
'delivery_status': 'CloudWatch Logs for tracking'
}
},
'analytics': {
'service': 'Pinpoint',
'purpose': 'User analytics and engagement',
'configuration': {
'events': 'Custom events tracking',
'campaigns': 'Targeted messaging',
'segments': 'User segmentation'
}
}
},
'estimated_cost': {
'monthly_usd': 50 + (self.expected_users * 0.005),
'breakdown': {
'AppSync': '5-40 USD',
'DynamoDB': '10-50 USD',
'Cognito': '0-15 USD',
'S3 + CloudFront': '10-40 USD',
'SNS': '1-10 USD',
'Pinpoint': '10-30 USD'
}
},
'pros': [
'Single GraphQL endpoint',
'Real-time subscriptions built-in',
'Offline-first capabilities',
'Auto-generated mobile SDK',
'Flexible querying (no over/under fetching)'
],
'cons': [
'GraphQL learning curve',
'Complex queries can be expensive',
'Debugging subscriptions challenging',
'Limited to AWS AppSync features'
],
'scaling_characteristics': {
'users_supported': '1k - 1M',
'requests_per_second': '100 - 100,000',
'scaling_method': 'Automatic (AppSync managed)'
}
}
def _event_driven_microservices(self) -> Dict[str, Any]:
"""Event-driven microservices architecture."""
return {
'pattern_name': 'Event-Driven Microservices',
'description': 'Loosely coupled services with event bus',
'use_case': 'Complex business workflows, asynchronous processing',
'services': {
'event_bus': {
'service': 'EventBridge',
'purpose': 'Central event routing between services',
'configuration': {
'bus_type': 'Custom event bus',
'rules': 'Route events by type/source',
'targets': 'Lambda, SQS, Step Functions',
'archive': 'Event replay capability'
}
},
'compute': {
'service': 'Lambda + ECS Fargate (hybrid)',
'purpose': 'Service implementation',
'configuration': {
'lambda': 'Lightweight services, event handlers',
'fargate': 'Long-running services, heavy processing',
'auto_scaling': 'Lambda (automatic), Fargate (target tracking)'
}
},
'queues': {
'service': 'SQS',
'purpose': 'Decouple services, handle failures',
'configuration': {
'queue_type': 'Standard (high throughput) or FIFO (ordering)',
'dlq': 'Dead letter queue after 3 retries',
'visibility_timeout': '30 seconds (adjust per service)',
'retention': '4 days'
}
},
'orchestration': {
'service': 'Step Functions',
'purpose': 'Complex workflows, saga patterns',
'configuration': {
'type': 'Standard (long-running) or Express (high volume)',
'error_handling': 'Retry, catch, rollback logic',
'timeouts': 'Per-state timeouts',
'logging': 'CloudWatch Logs integration'
}
},
'database': {
'service': 'DynamoDB (per service)',
'purpose': 'Each microservice owns its data',
'configuration': {
'pattern': 'Database per service',
'streams': 'DynamoDB Streams for change events',
'backup': 'Point-in-time recovery'
}
},
'api_gateway': {
'service': 'API Gateway',
'purpose': 'Unified API facade',
'configuration': {
'integration': 'Lambda proxy or HTTP proxy',
'authentication': 'Cognito or Lambda authorizer',
'rate_limiting': 'Per-client throttling'
}
}
},
'estimated_cost': {
'monthly_usd': 100 + (self.expected_users * 0.01),
'breakdown': {
'EventBridge': '5-20 USD',
'Lambda': '20-100 USD',
'SQS': '1-10 USD',
'Step Functions': '10-50 USD',
'DynamoDB': '30-150 USD',
'API Gateway': '10-40 USD'
}
},
'pros': [
'Loose coupling between services',
'Independent scaling and deployment',
'Failure isolation',
'Technology diversity possible',
'Easy to test individual services'
],
'cons': [
'Operational complexity',
'Distributed tracing required',
'Eventual consistency challenges',
'Network latency between services',
'More moving parts to monitor'
],
'scaling_characteristics': {
'users_supported': '10k - 10M',
'requests_per_second': '1,000 - 1,000,000',
'scaling_method': 'Per-service auto-scaling'
}
}
def _event_driven_data_pipeline(self) -> Dict[str, Any]:
"""Real-time data processing pipeline."""
return {
'pattern_name': 'Real-Time Data Pipeline',
'description': 'Scalable data ingestion and processing',
'use_case': 'Analytics, IoT data, log processing, ETL',
'services': {
'ingestion': {
'service': 'Kinesis Data Streams',
'purpose': 'Real-time data ingestion',
'configuration': {
'shards': f'{max(1, self.data_size_gb // 10)} shards',
'retention': '24 hours (extend to 7 days if needed)',
'encryption': 'KMS encryption'
}
},
'processing': {
'service': 'Lambda or Kinesis Analytics',
'purpose': 'Transform and enrich data',
'configuration': {
'lambda_concurrency': 'Match shard count',
'batch_size': '100-500 records per invocation',
'error_handling': 'DLQ for failed records'
}
},
'storage': {
'service': 'S3 Data Lake',
'purpose': 'Long-term storage and analytics',
'configuration': {
'format': 'Parquet (compressed, columnar)',
'partitioning': 'By date (year/month/day/hour)',
'lifecycle': 'Transition to Glacier after 90 days',
'catalog': 'AWS Glue Data Catalog'
}
},
'analytics': {
'service': 'Athena',
'purpose': 'SQL queries on S3 data',
'configuration': {
'query_results': 'Store in separate S3 bucket',
'workgroups': 'Separate dev and prod',
'cost_controls': 'Query limits per workgroup'
}
},
'visualization': {
'service': 'QuickSight',
'purpose': 'Business intelligence dashboards',
'configuration': {
'source': 'Athena or direct S3',
'refresh': 'Hourly or daily',
'sharing': 'Embedded dashboards or web access'
}
},
'alerting': {
'service': 'CloudWatch + SNS',
'purpose': 'Monitor metrics and alerts',
'configuration': {
'metrics': 'Custom metrics from processing',
'alarms': 'Threshold-based alerts',
'notifications': 'Email, Slack, PagerDuty'
}
}
},
'estimated_cost': {
'monthly_usd': self._calculate_data_pipeline_cost(),
'breakdown': {
'Kinesis': '15-100 USD (per shard)',
'Lambda': '10-50 USD',
'S3': '10-50 USD',
'Athena': '5-30 USD (per TB scanned)',
'QuickSight': '9-18 USD per user',
'Glue': '5-20 USD'
}
},
'pros': [
'Real-time processing capability',
'Scales to millions of events',
'Cost-effective long-term storage',
'SQL analytics on raw data',
'Serverless architecture'
],
'cons': [
'Kinesis shard management required',
'Athena costs based on data scanned',
'Schema evolution complexity',
'Cold data queries can be slow'
],
'scaling_characteristics': {
'events_per_second': '1,000 - 1,000,000',
'data_volume': '1 GB - 1 PB per day',
'scaling_method': 'Add Kinesis shards, partition S3 data'
}
}
def _iot_architecture(self) -> Dict[str, Any]:
"""IoT platform architecture."""
return {
'pattern_name': 'IoT Platform',
'description': 'Scalable IoT device management and data processing',
'use_case': 'Connected devices, sensors, smart devices',
'services': {
'device_management': {
'service': 'IoT Core',
'purpose': 'Device connectivity and management',
'configuration': {
'protocol': 'MQTT over TLS',
'thing_registry': 'Device metadata storage',
'device_shadow': 'Desired and reported state',
'rules_engine': 'Route messages to services'
}
},
'device_provisioning': {
'service': 'IoT Device Management',
'purpose': 'Fleet provisioning and updates',
'configuration': {
'fleet_indexing': 'Search devices',
'jobs': 'OTA firmware updates',
'bulk_operations': 'Manage device groups'
}
},
'data_processing': {
'service': 'IoT Analytics',
'purpose': 'Process and analyze IoT data',
'configuration': {
'channels': 'Ingest device data',
'pipelines': 'Transform and enrich',
'data_store': 'Time-series storage',
'notebooks': 'Jupyter notebooks for analysis'
}
},
'time_series_db': {
'service': 'Timestream',
'purpose': 'Store time-series metrics',
'configuration': {
'memory_store': 'Recent data (hours)',
'magnetic_store': 'Historical data (years)',
'retention': 'Auto-tier based on age'
}
},
'real_time_alerts': {
'service': 'IoT Events',
'purpose': 'Detect and respond to events',
'configuration': {
'detector_models': 'Define alert conditions',
'actions': 'SNS, Lambda, SQS',
'state_tracking': 'Per-device state machines'
}
}
},
'estimated_cost': {
'monthly_usd': 50 + (self.expected_users * 0.1), # Expected_users = device count
'breakdown': {
'IoT Core': '10-100 USD (per million messages)',
'IoT Analytics': '5-50 USD',
'Timestream': '10-80 USD',
'IoT Events': '1-20 USD',
'Data transfer': '10-50 USD'
}
},
'pros': [
'Built for IoT scale',
'Secure device connectivity',
'Managed device lifecycle',
'Time-series optimized',
'Real-time event detection'
],
'cons': [
'IoT-specific pricing model',
'MQTT protocol required',
'Regional limitations',
'Complexity for simple use cases'
],
'scaling_characteristics': {
'devices_supported': '100 - 10,000,000',
'messages_per_second': '1,000 - 100,000',
'scaling_method': 'Automatic (managed service)'
}
}
def _multi_region_architecture(self) -> Dict[str, Any]:
"""Multi-region high availability architecture."""
return {
'pattern_name': 'Multi-Region High Availability',
'description': 'Global deployment with disaster recovery',
'use_case': 'Global applications, 99.99% uptime, compliance',
'services': {
'dns': {
'service': 'Route 53',
'purpose': 'Global traffic routing',
'configuration': {
'routing_policy': 'Geolocation or latency-based',
'health_checks': 'Active monitoring with failover',
'failover': 'Automatic to secondary region'
}
},
'cdn': {
'service': 'CloudFront',
'purpose': 'Edge caching and acceleration',
'configuration': {
'origins': 'Multiple regions (primary + secondary)',
'origin_failover': 'Automatic failover',
'edge_locations': 'Global (400+ locations)'
}
},
'compute': {
'service': 'Multi-region Lambda or ECS',
'purpose': 'Active-active deployment',
'configuration': {
'regions': 'us-east-1 (primary), eu-west-1 (secondary)',
'deployment': 'Blue/Green in each region',
'traffic_split': '70/30 or 50/50'
}
},
'database': {
'service': 'DynamoDB Global Tables or Aurora Global',
'purpose': 'Multi-region replication',
'configuration': {
'replication': 'Sub-second replication lag',
'read_locality': 'Read from nearest region',
'write_forwarding': 'Aurora Global write forwarding',
'conflict_resolution': 'Last writer wins'
}
},
'storage': {
'service': 'S3 Cross-Region Replication',
'purpose': 'Replicate data across regions',
'configuration': {
'replication': 'Async replication to secondary',
'versioning': 'Required for CRR',
'replication_time_control': '15 minutes SLA'
}
}
},
'estimated_cost': {
'monthly_usd': self._calculate_three_tier_cost() * 1.8,
'breakdown': {
'Route 53': '10-30 USD',
'CloudFront': '20-100 USD',
'Compute (2 regions)': '100-500 USD',
'Database (Global Tables)': '200-800 USD',
'Data transfer (cross-region)': '50-200 USD'
}
},
'pros': [
'Global low latency',
'High availability (99.99%+)',
'Disaster recovery built-in',
'Data sovereignty compliance',
'Automatic failover'
],
'cons': [
'1.5-2x costs vs single region',
'Complex deployment pipeline',
'Data consistency challenges',
'More operational overhead',
'Cross-region data transfer costs'
],
'scaling_characteristics': {
'users_supported': '100k - 100M',
'requests_per_second': '10,000 - 10,000,000',
'scaling_method': 'Per-region auto-scaling + global routing'
}
}
def _calculate_serverless_cost(self) -> float:
"""Estimate serverless architecture cost."""
requests_per_month = self.requests_per_second * 2_592_000 # 30 days
lambda_cost = (requests_per_month / 1_000_000) * 0.20 # $0.20 per 1M requests
api_gateway_cost = (requests_per_month / 1_000_000) * 3.50 # $3.50 per 1M requests
dynamodb_cost = max(5, self.data_size_gb * 0.25) # $0.25 per GB/month
cloudfront_cost = max(10, self.expected_users * 0.01)
total = lambda_cost + api_gateway_cost + dynamodb_cost + cloudfront_cost
return min(total, self.budget_monthly) # Cap at budget
def _calculate_three_tier_cost(self) -> float:
"""Estimate three-tier architecture cost."""
fargate_tasks = max(2, self.expected_users // 5000)
fargate_cost = fargate_tasks * 30 # ~$30 per task/month
rds_cost = 150 # db.t3.medium baseline
elasticache_cost = 40 # cache.t3.micro
alb_cost = 25
total = fargate_cost + rds_cost + elasticache_cost + alb_cost
return min(total, self.budget_monthly)
def _calculate_data_pipeline_cost(self) -> float:
"""Estimate data pipeline cost."""
shards = max(1, self.data_size_gb // 10)
kinesis_cost = shards * 15 # $15 per shard/month
s3_cost = self.data_size_gb * 0.023 # $0.023 per GB/month
lambda_cost = 20 # Processing
athena_cost = 15 # Queries
total = kinesis_cost + s3_cost + lambda_cost + athena_cost
return min(total, self.budget_monthly)
def generate_service_checklist(self) -> List[Dict[str, Any]]:
"""Generate implementation checklist for recommended architecture."""
architecture = self.recommend_architecture_pattern()
checklist = [
{
'phase': 'Planning',
'tasks': [
'Review architecture pattern and services',
'Estimate costs using AWS Pricing Calculator',
'Define environment strategy (dev, staging, prod)',
'Set up AWS Organization and accounts',
'Define tagging strategy for resources'
]
},
{
'phase': 'Foundation',
'tasks': [
'Create VPC with public/private subnets',
'Configure NAT Gateway or VPC endpoints',
'Set up IAM roles and policies',
'Enable CloudTrail for audit logging',
'Configure AWS Config for compliance'
]
},
{
'phase': 'Core Services',
'tasks': [
f"Deploy {service['service']}"
for service in architecture['services'].values()
]
},
{
'phase': 'Security',
'tasks': [
'Configure security groups and NACLs',
'Enable encryption (KMS) for all services',
'Set up AWS WAF rules',
'Configure Secrets Manager',
'Enable GuardDuty for threat detection'
]
},
{
'phase': 'Monitoring',
'tasks': [
'Create CloudWatch dashboards',
'Set up alarms for critical metrics',
'Configure SNS topics for notifications',
'Enable X-Ray for distributed tracing',
'Set up log aggregation and retention'
]
},
{
'phase': 'CI/CD',
'tasks': [
'Set up CodePipeline or GitHub Actions',
'Configure automated testing',
'Implement blue/green deployment',
'Set up rollback procedures',
'Document deployment process'
]
}
]
return checklist

View File

@@ -0,0 +1,346 @@
"""
AWS cost optimization analyzer.
Provides cost-saving recommendations for startup budgets.
"""
from typing import Dict, List, Any, Optional
class CostOptimizer:
"""Analyze AWS costs and provide optimization recommendations."""
def __init__(self, current_resources: Dict[str, Any], monthly_spend: float):
"""
Initialize with current AWS resources and spending.
Args:
current_resources: Dictionary of current AWS resources
monthly_spend: Current monthly AWS spend in USD
"""
self.resources = current_resources
self.monthly_spend = monthly_spend
self.recommendations = []
def analyze_and_optimize(self) -> Dict[str, Any]:
"""
Analyze current setup and generate cost optimization recommendations.
Returns:
Dictionary with recommendations and potential savings
"""
self.recommendations = []
potential_savings = 0.0
# Analyze compute resources
compute_savings = self._analyze_compute()
potential_savings += compute_savings
# Analyze storage
storage_savings = self._analyze_storage()
potential_savings += storage_savings
# Analyze database
database_savings = self._analyze_database()
potential_savings += database_savings
# Analyze networking
network_savings = self._analyze_networking()
potential_savings += network_savings
# General AWS optimizations
general_savings = self._analyze_general_optimizations()
potential_savings += general_savings
return {
'current_monthly_spend': self.monthly_spend,
'potential_monthly_savings': round(potential_savings, 2),
'optimized_monthly_spend': round(self.monthly_spend - potential_savings, 2),
'savings_percentage': round((potential_savings / self.monthly_spend) * 100, 2) if self.monthly_spend > 0 else 0,
'recommendations': self.recommendations,
'priority_actions': self._prioritize_recommendations()
}
def _analyze_compute(self) -> float:
"""Analyze compute resources (EC2, Lambda, Fargate)."""
savings = 0.0
ec2_instances = self.resources.get('ec2_instances', [])
if ec2_instances:
# Check for idle instances
idle_count = sum(1 for inst in ec2_instances if inst.get('cpu_utilization', 100) < 10)
if idle_count > 0:
idle_cost = idle_count * 50 # Assume $50/month per idle instance
savings += idle_cost
self.recommendations.append({
'service': 'EC2',
'type': 'Idle Resources',
'issue': f'{idle_count} EC2 instances with <10% CPU utilization',
'recommendation': 'Stop or terminate idle instances, or downsize to smaller instance types',
'potential_savings': idle_cost,
'priority': 'high'
})
# Check for Savings Plans / Reserved Instances
on_demand_count = sum(1 for inst in ec2_instances if inst.get('pricing', 'on-demand') == 'on-demand')
if on_demand_count >= 2:
ri_savings = on_demand_count * 50 * 0.30 # 30% savings with RIs
savings += ri_savings
self.recommendations.append({
'service': 'EC2',
'type': 'Pricing Optimization',
'issue': f'{on_demand_count} instances on On-Demand pricing',
'recommendation': 'Purchase Compute Savings Plan or Reserved Instances for predictable workloads (1-year commitment)',
'potential_savings': ri_savings,
'priority': 'medium'
})
# Lambda optimization
lambda_functions = self.resources.get('lambda_functions', [])
if lambda_functions:
oversized = sum(1 for fn in lambda_functions if fn.get('memory_mb', 128) > 512 and fn.get('avg_memory_used_mb', 0) < 256)
if oversized > 0:
lambda_savings = oversized * 5 # Assume $5/month per oversized function
savings += lambda_savings
self.recommendations.append({
'service': 'Lambda',
'type': 'Right-sizing',
'issue': f'{oversized} Lambda functions over-provisioned (memory too high)',
'recommendation': 'Use AWS Lambda Power Tuning tool to optimize memory settings',
'potential_savings': lambda_savings,
'priority': 'low'
})
return savings
def _analyze_storage(self) -> float:
"""Analyze S3 and other storage resources."""
savings = 0.0
s3_buckets = self.resources.get('s3_buckets', [])
for bucket in s3_buckets:
size_gb = bucket.get('size_gb', 0)
storage_class = bucket.get('storage_class', 'STANDARD')
# Check for lifecycle policies
if not bucket.get('has_lifecycle_policy', False) and size_gb > 100:
lifecycle_savings = size_gb * 0.015 # $0.015/GB savings with IA transition
savings += lifecycle_savings
self.recommendations.append({
'service': 'S3',
'type': 'Lifecycle Policy',
'issue': f'Bucket {bucket.get("name", "unknown")} ({size_gb} GB) has no lifecycle policy',
'recommendation': 'Implement lifecycle policy: Transition to IA after 30 days, Glacier after 90 days',
'potential_savings': lifecycle_savings,
'priority': 'medium'
})
# Check for Intelligent-Tiering
if storage_class == 'STANDARD' and size_gb > 500:
tiering_savings = size_gb * 0.005
savings += tiering_savings
self.recommendations.append({
'service': 'S3',
'type': 'Storage Class',
'issue': f'Large bucket ({size_gb} GB) using STANDARD storage',
'recommendation': 'Enable S3 Intelligent-Tiering for automatic cost optimization',
'potential_savings': tiering_savings,
'priority': 'high'
})
return savings
def _analyze_database(self) -> float:
"""Analyze RDS, DynamoDB, and other database costs."""
savings = 0.0
rds_instances = self.resources.get('rds_instances', [])
for db in rds_instances:
# Check for idle databases
if db.get('connections_per_day', 1000) < 10:
db_cost = db.get('monthly_cost', 100)
savings += db_cost * 0.8 # Can save 80% by stopping
self.recommendations.append({
'service': 'RDS',
'type': 'Idle Resource',
'issue': f'Database {db.get("name", "unknown")} has <10 connections/day',
'recommendation': 'Stop database if not needed, or take final snapshot and delete',
'potential_savings': db_cost * 0.8,
'priority': 'high'
})
# Check for Aurora Serverless opportunity
if db.get('engine', '').startswith('aurora') and db.get('utilization', 100) < 30:
serverless_savings = db.get('monthly_cost', 200) * 0.40
savings += serverless_savings
self.recommendations.append({
'service': 'RDS Aurora',
'type': 'Serverless Migration',
'issue': f'Aurora instance {db.get("name", "unknown")} has low utilization (<30%)',
'recommendation': 'Migrate to Aurora Serverless v2 for auto-scaling and pay-per-use',
'potential_savings': serverless_savings,
'priority': 'medium'
})
# DynamoDB optimization
dynamodb_tables = self.resources.get('dynamodb_tables', [])
for table in dynamodb_tables:
if table.get('billing_mode', 'PROVISIONED') == 'PROVISIONED':
read_capacity = table.get('read_capacity_units', 0)
write_capacity = table.get('write_capacity_units', 0)
utilization = table.get('utilization_percentage', 100)
if utilization < 20:
on_demand_savings = (read_capacity * 0.00013 + write_capacity * 0.00065) * 730 * 0.3
savings += on_demand_savings
self.recommendations.append({
'service': 'DynamoDB',
'type': 'Billing Mode',
'issue': f'Table {table.get("name", "unknown")} has low utilization with provisioned capacity',
'recommendation': 'Switch to On-Demand billing mode for variable workloads',
'potential_savings': on_demand_savings,
'priority': 'medium'
})
return savings
def _analyze_networking(self) -> float:
"""Analyze networking costs (data transfer, NAT Gateway, etc.)."""
savings = 0.0
nat_gateways = self.resources.get('nat_gateways', [])
if len(nat_gateways) > 1:
multi_az = self.resources.get('multi_az_required', False)
if not multi_az:
nat_savings = (len(nat_gateways) - 1) * 45 # $45/month per NAT Gateway
savings += nat_savings
self.recommendations.append({
'service': 'NAT Gateway',
'type': 'Resource Consolidation',
'issue': f'{len(nat_gateways)} NAT Gateways deployed (multi-AZ not required)',
'recommendation': 'Use single NAT Gateway in dev/staging, or consider VPC endpoints for AWS services',
'potential_savings': nat_savings,
'priority': 'high'
})
# Check for VPC endpoints opportunity
if not self.resources.get('vpc_endpoints', []):
s3_data_transfer = self.resources.get('s3_data_transfer_gb', 0)
if s3_data_transfer > 100:
endpoint_savings = s3_data_transfer * 0.09 * 0.5 # Save 50% of data transfer costs
savings += endpoint_savings
self.recommendations.append({
'service': 'VPC',
'type': 'VPC Endpoints',
'issue': 'High S3 data transfer without VPC endpoints',
'recommendation': 'Create VPC endpoints for S3 and DynamoDB to avoid NAT Gateway costs',
'potential_savings': endpoint_savings,
'priority': 'medium'
})
return savings
def _analyze_general_optimizations(self) -> float:
"""General AWS cost optimizations."""
savings = 0.0
# Check for CloudWatch Logs retention
log_groups = self.resources.get('cloudwatch_log_groups', [])
for log in log_groups:
if log.get('retention_days', 1) == -1: # Never expire
log_size_gb = log.get('size_gb', 1)
retention_savings = log_size_gb * 0.50 * 0.7 # 70% savings with 7-day retention
savings += retention_savings
self.recommendations.append({
'service': 'CloudWatch Logs',
'type': 'Retention Policy',
'issue': f'Log group {log.get("name", "unknown")} has infinite retention',
'recommendation': 'Set retention to 7 days for non-compliance logs, 30 days for production',
'potential_savings': retention_savings,
'priority': 'low'
})
# Check for unused Elastic IPs
elastic_ips = self.resources.get('elastic_ips', [])
unattached = sum(1 for eip in elastic_ips if not eip.get('attached', True))
if unattached > 0:
eip_savings = unattached * 3.65 # $0.005/hour = $3.65/month
savings += eip_savings
self.recommendations.append({
'service': 'EC2',
'type': 'Unused Resources',
'issue': f'{unattached} unattached Elastic IPs',
'recommendation': 'Release unused Elastic IPs to avoid hourly charges',
'potential_savings': eip_savings,
'priority': 'high'
})
# Budget alerts
if not self.resources.get('has_budget_alerts', False):
self.recommendations.append({
'service': 'AWS Budgets',
'type': 'Cost Monitoring',
'issue': 'No budget alerts configured',
'recommendation': 'Set up AWS Budgets with alerts at 50%, 80%, 100% of monthly budget',
'potential_savings': 0,
'priority': 'high'
})
# Cost Explorer recommendations
if not self.resources.get('has_cost_explorer', False):
self.recommendations.append({
'service': 'Cost Management',
'type': 'Visibility',
'issue': 'Cost Explorer not enabled',
'recommendation': 'Enable AWS Cost Explorer to track spending patterns and identify anomalies',
'potential_savings': 0,
'priority': 'medium'
})
return savings
def _prioritize_recommendations(self) -> List[Dict[str, Any]]:
"""Get top priority recommendations."""
high_priority = [r for r in self.recommendations if r['priority'] == 'high']
high_priority.sort(key=lambda x: x.get('potential_savings', 0), reverse=True)
return high_priority[:5] # Top 5 high-priority recommendations
def generate_optimization_checklist(self) -> List[Dict[str, Any]]:
"""Generate actionable checklist for cost optimization."""
return [
{
'category': 'Immediate Actions (Today)',
'items': [
'Release unattached Elastic IPs',
'Stop idle EC2 instances',
'Delete unused EBS volumes',
'Set up budget alerts'
]
},
{
'category': 'This Week',
'items': [
'Implement S3 lifecycle policies',
'Consolidate NAT Gateways in non-prod',
'Set CloudWatch Logs retention to 7 days',
'Review and rightsize EC2/RDS instances'
]
},
{
'category': 'This Month',
'items': [
'Evaluate Savings Plans or Reserved Instances',
'Migrate to Aurora Serverless where applicable',
'Implement VPC endpoints for S3/DynamoDB',
'Switch DynamoDB tables to On-Demand if variable load'
]
},
{
'category': 'Ongoing',
'items': [
'Review Cost Explorer weekly',
'Tag all resources for cost allocation',
'Monitor Trusted Advisor recommendations',
'Conduct monthly cost review meetings'
]
}
]

View File

@@ -0,0 +1,663 @@
"""
Serverless stack generator for AWS.
Creates CloudFormation/CDK templates for serverless applications.
"""
from typing import Dict, List, Any, Optional
class ServerlessStackGenerator:
"""Generate serverless application stacks."""
def __init__(self, app_name: str, requirements: Dict[str, Any]):
"""
Initialize with application requirements.
Args:
app_name: Application name (used for resource naming)
requirements: Dictionary with API, database, auth requirements
"""
self.app_name = app_name.lower().replace(' ', '-')
self.requirements = requirements
self.region = requirements.get('region', 'us-east-1')
def generate_cloudformation_template(self) -> str:
"""
Generate CloudFormation template for serverless stack.
Returns:
YAML CloudFormation template as string
"""
template = f"""AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Serverless stack for {self.app_name}
Parameters:
Environment:
Type: String
Default: dev
AllowedValues:
- dev
- staging
- production
Description: Deployment environment
CorsAllowedOrigins:
Type: String
Default: '*'
Description: CORS allowed origins for API Gateway
Resources:
# DynamoDB Table
{self.app_name.replace('-', '')}Table:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub '${{Environment}}-{self.app_name}-data'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: PK
AttributeType: S
- AttributeName: SK
AttributeType: S
KeySchema:
- AttributeName: PK
KeyType: HASH
- AttributeName: SK
KeyType: RANGE
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
SSESpecification:
SSEEnabled: true
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
Tags:
- Key: Environment
Value: !Ref Environment
- Key: Application
Value: {self.app_name}
# Lambda Execution Role
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: DynamoDBAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
- dynamodb:Query
- dynamodb:Scan
Resource: !GetAtt {self.app_name.replace('-', '')}Table.Arn
# Lambda Function
ApiFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub '${{Environment}}-{self.app_name}-api'
Handler: index.handler
Runtime: nodejs18.x
CodeUri: ./src
MemorySize: 512
Timeout: 10
Role: !GetAtt LambdaExecutionRole.Arn
Environment:
Variables:
TABLE_NAME: !Ref {self.app_name.replace('-', '')}Table
ENVIRONMENT: !Ref Environment
Events:
ApiEvent:
Type: Api
Properties:
Path: /{{proxy+}}
Method: ANY
RestApiId: !Ref ApiGateway
Tags:
Environment: !Ref Environment
Application: {self.app_name}
# API Gateway
ApiGateway:
Type: AWS::Serverless::Api
Properties:
Name: !Sub '${{Environment}}-{self.app_name}-api'
StageName: !Ref Environment
Cors:
AllowMethods: "'GET,POST,PUT,DELETE,OPTIONS'"
AllowHeaders: "'Content-Type,Authorization,X-Amz-Date,X-Api-Key,X-Amz-Security-Token'"
AllowOrigin: !Sub "'${{CorsAllowedOrigins}}'"
Auth:
DefaultAuthorizer: CognitoAuthorizer
Authorizers:
CognitoAuthorizer:
UserPoolArn: !GetAtt UserPool.Arn
ThrottleSettings:
BurstLimit: 200
RateLimit: 100
Tags:
Environment: !Ref Environment
Application: {self.app_name}
# Cognito User Pool
UserPool:
Type: AWS::Cognito::UserPool
Properties:
UserPoolName: !Sub '${{Environment}}-{self.app_name}-users'
UsernameAttributes:
- email
AutoVerifiedAttributes:
- email
Policies:
PasswordPolicy:
MinimumLength: 8
RequireUppercase: true
RequireLowercase: true
RequireNumbers: true
RequireSymbols: false
MfaConfiguration: OPTIONAL
EnabledMfas:
- SOFTWARE_TOKEN_MFA
UserAttributeUpdateSettings:
AttributesRequireVerificationBeforeUpdate:
- email
Schema:
- Name: email
Required: true
Mutable: true
# Cognito User Pool Client
UserPoolClient:
Type: AWS::Cognito::UserPoolClient
Properties:
ClientName: !Sub '${{Environment}}-{self.app_name}-client'
UserPoolId: !Ref UserPool
GenerateSecret: false
RefreshTokenValidity: 30
AccessTokenValidity: 1
IdTokenValidity: 1
TokenValidityUnits:
RefreshToken: days
AccessToken: hours
IdToken: hours
ExplicitAuthFlows:
- ALLOW_USER_SRP_AUTH
- ALLOW_REFRESH_TOKEN_AUTH
# CloudWatch Log Group
ApiLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/${{Environment}}-{self.app_name}-api'
RetentionInDays: 7
Outputs:
ApiUrl:
Description: API Gateway endpoint URL
Value: !Sub 'https://${{ApiGateway}}.execute-api.${{AWS::Region}}.amazonaws.com/${{Environment}}'
Export:
Name: !Sub '${{Environment}}-{self.app_name}-ApiUrl'
UserPoolId:
Description: Cognito User Pool ID
Value: !Ref UserPool
Export:
Name: !Sub '${{Environment}}-{self.app_name}-UserPoolId'
UserPoolClientId:
Description: Cognito User Pool Client ID
Value: !Ref UserPoolClient
Export:
Name: !Sub '${{Environment}}-{self.app_name}-UserPoolClientId'
TableName:
Description: DynamoDB Table Name
Value: !Ref {self.app_name.replace('-', '')}Table
Export:
Name: !Sub '${{Environment}}-{self.app_name}-TableName'
"""
return template
def generate_cdk_stack(self) -> str:
"""
Generate AWS CDK stack in TypeScript.
Returns:
CDK stack code as string
"""
stack = f"""import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as apigateway from 'aws-cdk-lib/aws-apigateway';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as cognito from 'aws-cdk-lib/aws-cognito';
import {{ Construct }} from 'constructs';
export class {self.app_name.replace('-', '').title()}Stack extends cdk.Stack {{
constructor(scope: Construct, id: string, props?: cdk.StackProps) {{
super(scope, id, props);
// DynamoDB Table
const table = new dynamodb.Table(this, '{self.app_name}Table', {{
tableName: `${{cdk.Stack.of(this).stackName}}-data`,
partitionKey: {{ name: 'PK', type: dynamodb.AttributeType.STRING }},
sortKey: {{ name: 'SK', type: dynamodb.AttributeType.STRING }},
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
encryption: dynamodb.TableEncryption.AWS_MANAGED,
pointInTimeRecovery: true,
stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
removalPolicy: cdk.RemovalPolicy.RETAIN,
}});
// Cognito User Pool
const userPool = new cognito.UserPool(this, '{self.app_name}UserPool', {{
userPoolName: `${{cdk.Stack.of(this).stackName}}-users`,
selfSignUpEnabled: true,
signInAliases: {{ email: true }},
autoVerify: {{ email: true }},
passwordPolicy: {{
minLength: 8,
requireLowercase: true,
requireUppercase: true,
requireDigits: true,
requireSymbols: false,
}},
mfa: cognito.Mfa.OPTIONAL,
mfaSecondFactor: {{
sms: false,
otp: true,
}},
removalPolicy: cdk.RemovalPolicy.RETAIN,
}});
const userPoolClient = userPool.addClient('{self.app_name}Client', {{
authFlows: {{
userSrp: true,
}},
accessTokenValidity: cdk.Duration.hours(1),
refreshTokenValidity: cdk.Duration.days(30),
}});
// Lambda Function
const apiFunction = new lambda.Function(this, '{self.app_name}ApiFunction', {{
functionName: `${{cdk.Stack.of(this).stackName}}-api`,
runtime: lambda.Runtime.NODEJS_18_X,
handler: 'index.handler',
code: lambda.Code.fromAsset('./src'),
memorySize: 512,
timeout: cdk.Duration.seconds(10),
environment: {{
TABLE_NAME: table.tableName,
USER_POOL_ID: userPool.userPoolId,
}},
logRetention: 7, // days
}});
// Grant Lambda permissions to DynamoDB
table.grantReadWriteData(apiFunction);
// API Gateway
const api = new apigateway.RestApi(this, '{self.app_name}Api', {{
restApiName: `${{cdk.Stack.of(this).stackName}}-api`,
description: 'API for {self.app_name}',
defaultCorsPreflightOptions: {{
allowOrigins: apigateway.Cors.ALL_ORIGINS,
allowMethods: apigateway.Cors.ALL_METHODS,
allowHeaders: ['Content-Type', 'Authorization'],
}},
deployOptions: {{
stageName: 'prod',
throttlingRateLimit: 100,
throttlingBurstLimit: 200,
metricsEnabled: true,
loggingLevel: apigateway.MethodLoggingLevel.INFO,
}},
}});
// Cognito Authorizer
const authorizer = new apigateway.CognitoUserPoolsAuthorizer(this, 'ApiAuthorizer', {{
cognitoUserPools: [userPool],
}});
// API Integration
const integration = new apigateway.LambdaIntegration(apiFunction);
// Add proxy resource (/{{proxy+}})
const proxyResource = api.root.addProxy({{
defaultIntegration: integration,
anyMethod: true,
defaultMethodOptions: {{
authorizer: authorizer,
authorizationType: apigateway.AuthorizationType.COGNITO,
}},
}});
// Outputs
new cdk.CfnOutput(this, 'ApiUrl', {{
value: api.url,
description: 'API Gateway URL',
}});
new cdk.CfnOutput(this, 'UserPoolId', {{
value: userPool.userPoolId,
description: 'Cognito User Pool ID',
}});
new cdk.CfnOutput(this, 'UserPoolClientId', {{
value: userPoolClient.userPoolClientId,
description: 'Cognito User Pool Client ID',
}});
new cdk.CfnOutput(this, 'TableName', {{
value: table.tableName,
description: 'DynamoDB Table Name',
}});
}}
}}
"""
return stack
def generate_terraform_configuration(self) -> str:
"""
Generate Terraform configuration for serverless stack.
Returns:
Terraform HCL configuration as string
"""
terraform = f"""terraform {{
required_version = ">= 1.0"
required_providers {{
aws = {{
source = "hashicorp/aws"
version = "~> 5.0"
}}
}}
}}
provider "aws" {{
region = var.aws_region
}}
variable "aws_region" {{
description = "AWS region"
type = string
default = "{self.region}"
}}
variable "environment" {{
description = "Environment name"
type = string
default = "dev"
}}
variable "app_name" {{
description = "Application name"
type = string
default = "{self.app_name}"
}}
# DynamoDB Table
resource "aws_dynamodb_table" "main" {{
name = "${{var.environment}}-${{var.app_name}}-data"
billing_mode = "PAY_PER_REQUEST"
hash_key = "PK"
range_key = "SK"
attribute {{
name = "PK"
type = "S"
}}
attribute {{
name = "SK"
type = "S"
}}
server_side_encryption {{
enabled = true
}}
point_in_time_recovery {{
enabled = true
}}
stream_enabled = true
stream_view_type = "NEW_AND_OLD_IMAGES"
tags = {{
Environment = var.environment
Application = var.app_name
}}
}}
# Cognito User Pool
resource "aws_cognito_user_pool" "main" {{
name = "${{var.environment}}-${{var.app_name}}-users"
username_attributes = ["email"]
auto_verified_attributes = ["email"]
password_policy {{
minimum_length = 8
require_lowercase = true
require_numbers = true
require_uppercase = true
require_symbols = false
}}
mfa_configuration = "OPTIONAL"
software_token_mfa_configuration {{
enabled = true
}}
schema {{
name = "email"
attribute_data_type = "String"
required = true
mutable = true
}}
tags = {{
Environment = var.environment
Application = var.app_name
}}
}}
resource "aws_cognito_user_pool_client" "main" {{
name = "${{var.environment}}-${{var.app_name}}-client"
user_pool_id = aws_cognito_user_pool.main.id
generate_secret = false
explicit_auth_flows = [
"ALLOW_USER_SRP_AUTH",
"ALLOW_REFRESH_TOKEN_AUTH"
]
refresh_token_validity = 30
access_token_validity = 1
id_token_validity = 1
token_validity_units {{
refresh_token = "days"
access_token = "hours"
id_token = "hours"
}}
}}
# IAM Role for Lambda
resource "aws_iam_role" "lambda" {{
name = "${{var.environment}}-${{var.app_name}}-lambda-role"
assume_role_policy = jsonencode({{
Version = "2012-10-17"
Statement = [{{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {{
Service = "lambda.amazonaws.com"
}}
}}]
}})
tags = {{
Environment = var.environment
Application = var.app_name
}}
}}
resource "aws_iam_role_policy_attachment" "lambda_basic" {{
role = aws_iam_role.lambda.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}}
resource "aws_iam_role_policy" "dynamodb" {{
name = "dynamodb-access"
role = aws_iam_role.lambda.id
policy = jsonencode({{
Version = "2012-10-17"
Statement = [{{
Effect = "Allow"
Action = [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:Query",
"dynamodb:Scan"
]
Resource = aws_dynamodb_table.main.arn
}}]
}})
}}
# Lambda Function
resource "aws_lambda_function" "api" {{
filename = "lambda.zip"
function_name = "${{var.environment}}-${{var.app_name}}-api"
role = aws_iam_role.lambda.arn
handler = "index.handler"
runtime = "nodejs18.x"
memory_size = 512
timeout = 10
environment {{
variables = {{
TABLE_NAME = aws_dynamodb_table.main.name
USER_POOL_ID = aws_cognito_user_pool.main.id
ENVIRONMENT = var.environment
}}
}}
tags = {{
Environment = var.environment
Application = var.app_name
}}
}}
# CloudWatch Log Group
resource "aws_cloudwatch_log_group" "lambda" {{
name = "/aws/lambda/${{aws_lambda_function.api.function_name}}"
retention_in_days = 7
tags = {{
Environment = var.environment
Application = var.app_name
}}
}}
# API Gateway
resource "aws_api_gateway_rest_api" "main" {{
name = "${{var.environment}}-${{var.app_name}}-api"
description = "API for ${{var.app_name}}"
tags = {{
Environment = var.environment
Application = var.app_name
}}
}}
resource "aws_api_gateway_authorizer" "cognito" {{
name = "cognito-authorizer"
rest_api_id = aws_api_gateway_rest_api.main.id
type = "COGNITO_USER_POOLS"
provider_arns = [aws_cognito_user_pool.main.arn]
}}
resource "aws_api_gateway_resource" "proxy" {{
rest_api_id = aws_api_gateway_rest_api.main.id
parent_id = aws_api_gateway_rest_api.main.root_resource_id
path_part = "{{proxy+}}"
}}
resource "aws_api_gateway_method" "proxy" {{
rest_api_id = aws_api_gateway_rest_api.main.id
resource_id = aws_api_gateway_resource.proxy.id
http_method = "ANY"
authorization = "COGNITO_USER_POOLS"
authorizer_id = aws_api_gateway_authorizer.cognito.id
}}
resource "aws_api_gateway_integration" "lambda" {{
rest_api_id = aws_api_gateway_rest_api.main.id
resource_id = aws_api_gateway_resource.proxy.id
http_method = aws_api_gateway_method.proxy.http_method
integration_http_method = "POST"
type = "AWS_PROXY"
uri = aws_lambda_function.api.invoke_arn
}}
resource "aws_lambda_permission" "apigw" {{
statement_id = "AllowAPIGatewayInvoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.api.function_name
principal = "apigateway.amazonaws.com"
source_arn = "${{aws_api_gateway_rest_api.main.execution_arn}}/*/*"
}}
resource "aws_api_gateway_deployment" "main" {{
depends_on = [
aws_api_gateway_integration.lambda
]
rest_api_id = aws_api_gateway_rest_api.main.id
stage_name = var.environment
}}
# Outputs
output "api_url" {{
description = "API Gateway URL"
value = aws_api_gateway_deployment.main.invoke_url
}}
output "user_pool_id" {{
description = "Cognito User Pool ID"
value = aws_cognito_user_pool.main.id
}}
output "user_pool_client_id" {{
description = "Cognito User Pool Client ID"
value = aws_cognito_user_pool_client.main.id
}}
output "table_name" {{
description = "DynamoDB Table Name"
value = aws_dynamodb_table.main.name
}}
"""
return terraform