Files
Reza Rezvani 2056ba251f feat(engineering-team): add azure-cloud-architect, security-pen-testing; extend terraform-patterns
azure-cloud-architect (451-line SKILL.md, 3 scripts, 3 references):
- 6-step workflow mirroring aws-solution-architect for Azure
- Bicep/ARM templates, AKS, Functions, Cosmos DB, cost optimization
- architecture_designer.py, cost_optimizer.py, bicep_generator.py

security-pen-testing (850-line SKILL.md, 3 scripts, 3 references):
- OWASP Top 10 systematic audit, offensive security testing
- XSS/SQLi/SSRF/IDOR detection, secret scanning, API security
- vulnerability_scanner.py, dependency_auditor.py, pentest_report_generator.py
- Responsible disclosure workflow included

terraform-patterns extended (487 → 740 lines):
- Multi-cloud provider configuration
- OpenTofu compatibility notes
- Infracost integration for PR cost estimation
- Import existing infrastructure patterns
- Terragrunt DRY multi-environment patterns

Updated engineering-team plugin.json (26 → 28 skills).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 13:32:22 +01:00

493 lines
21 KiB
Python

#!/usr/bin/env python3
"""
Azure cost optimization analyzer.
Analyzes Azure resource configurations and provides cost-saving recommendations.
Usage:
python cost_optimizer.py --config resources.json
python cost_optimizer.py --config resources.json --json
python cost_optimizer.py --help
Expected JSON config format:
{
"virtual_machines": [
{"name": "vm-web-01", "size": "Standard_D4s_v5", "cpu_utilization": 12, "pricing": "on-demand", "monthly_cost": 140}
],
"sql_databases": [
{"name": "sqldb-main", "tier": "GeneralPurpose", "vcores": 8, "utilization": 25, "monthly_cost": 400}
],
"storage_accounts": [
{"name": "stmyapp", "size_gb": 500, "tier": "Hot", "has_lifecycle_policy": false}
],
"aks_clusters": [
{"name": "aks-prod", "node_count": 6, "node_size": "Standard_D4s_v5", "avg_cpu_utilization": 35, "monthly_cost": 800}
],
"cosmos_db": [
{"name": "cosmos-orders", "ru_provisioned": 10000, "ru_used_avg": 2000, "monthly_cost": 580}
],
"public_ips": [
{"name": "pip-unused", "attached": false}
],
"app_services": [
{"name": "app-web", "tier": "PremiumV3", "instance_count": 3, "cpu_utilization": 15, "monthly_cost": 300}
],
"has_budget_alerts": false,
"has_advisor_enabled": false
}
"""
import argparse
import json
import sys
from typing import Dict, List, Any
class AzureCostOptimizer:
"""Analyze Azure resource configurations and recommend cost savings."""
def __init__(self, resources: Dict[str, Any]):
self.resources = resources
self.recommendations: List[Dict[str, Any]] = []
def analyze(self) -> Dict[str, Any]:
"""Run all analysis passes and return full report."""
self.recommendations = []
total_savings = 0.0
total_savings += self._analyze_virtual_machines()
total_savings += self._analyze_sql_databases()
total_savings += self._analyze_storage()
total_savings += self._analyze_aks()
total_savings += self._analyze_cosmos_db()
total_savings += self._analyze_app_services()
total_savings += self._analyze_networking()
total_savings += self._analyze_general()
current_spend = self._estimate_current_spend()
return {
"current_monthly_usd": round(current_spend, 2),
"potential_monthly_savings_usd": round(total_savings, 2),
"optimized_monthly_usd": round(current_spend - total_savings, 2),
"savings_percentage": round((total_savings / current_spend) * 100, 2) if current_spend > 0 else 0,
"recommendations": self.recommendations,
"priority_actions": self._top_priority(),
}
# ------------------------------------------------------------------
# Analysis passes
# ------------------------------------------------------------------
def _analyze_virtual_machines(self) -> float:
savings = 0.0
vms = self.resources.get("virtual_machines", [])
for vm in vms:
cost = vm.get("monthly_cost", 140)
cpu = vm.get("cpu_utilization", 100)
pricing = vm.get("pricing", "on-demand")
# Idle VMs
if cpu < 5:
savings += cost * 0.9
self.recommendations.append({
"service": "Virtual Machines",
"type": "Idle Resource",
"issue": f"VM {vm.get('name', '?')} has <5% CPU utilization",
"recommendation": "Deallocate or delete the VM. Use Azure Automation auto-shutdown for dev/test VMs.",
"potential_savings_usd": round(cost * 0.9, 2),
"priority": "high",
})
elif cpu < 20:
savings += cost * 0.4
self.recommendations.append({
"service": "Virtual Machines",
"type": "Right-sizing",
"issue": f"VM {vm.get('name', '?')} is under-utilized ({cpu}% CPU)",
"recommendation": "Downsize to a smaller SKU. Use Azure Advisor right-sizing recommendations.",
"potential_savings_usd": round(cost * 0.4, 2),
"priority": "high",
})
# Reserved Instances
if pricing == "on-demand" and cpu >= 20:
ri_savings = cost * 0.35
savings += ri_savings
self.recommendations.append({
"service": "Virtual Machines",
"type": "Reserved Instances",
"issue": f"VM {vm.get('name', '?')} runs on-demand with steady utilization",
"recommendation": "Purchase 1-year Reserved Instance (up to 35% savings) or 3-year (up to 55% savings).",
"potential_savings_usd": round(ri_savings, 2),
"priority": "medium",
})
# Spot VMs for batch/fault-tolerant workloads
spot_candidates = [vm for vm in vms if vm.get("workload_type") in ("batch", "dev", "test")]
if spot_candidates:
spot_savings = sum(vm.get("monthly_cost", 100) * 0.6 for vm in spot_candidates)
savings += spot_savings
self.recommendations.append({
"service": "Virtual Machines",
"type": "Spot VMs",
"issue": f"{len(spot_candidates)} VMs running batch/dev/test workloads on regular instances",
"recommendation": "Switch to Azure Spot VMs for up to 90% savings on interruptible workloads.",
"potential_savings_usd": round(spot_savings, 2),
"priority": "medium",
})
return savings
def _analyze_sql_databases(self) -> float:
savings = 0.0
dbs = self.resources.get("sql_databases", [])
for db in dbs:
cost = db.get("monthly_cost", 200)
utilization = db.get("utilization", 100)
vcores = db.get("vcores", 2)
tier = db.get("tier", "GeneralPurpose")
# Idle databases
if db.get("connections_per_day", 1000) < 10:
savings += cost * 0.8
self.recommendations.append({
"service": "Azure SQL",
"type": "Idle Resource",
"issue": f"Database {db.get('name', '?')} has <10 connections/day",
"recommendation": "Delete unused database or switch to serverless tier with auto-pause.",
"potential_savings_usd": round(cost * 0.8, 2),
"priority": "high",
})
# Serverless opportunity
elif utilization < 30 and tier == "GeneralPurpose":
serverless_savings = cost * 0.45
savings += serverless_savings
self.recommendations.append({
"service": "Azure SQL",
"type": "Serverless Migration",
"issue": f"Database {db.get('name', '?')} has low utilization ({utilization}%) on provisioned tier",
"recommendation": "Switch to Azure SQL Serverless tier with auto-pause (60-min delay). Pay only for active compute.",
"potential_savings_usd": round(serverless_savings, 2),
"priority": "high",
})
# Right-sizing
elif utilization < 50 and vcores > 2:
right_size_savings = cost * 0.3
savings += right_size_savings
self.recommendations.append({
"service": "Azure SQL",
"type": "Right-sizing",
"issue": f"Database {db.get('name', '?')} uses {vcores} vCores at {utilization}% utilization",
"recommendation": f"Reduce to {max(2, vcores // 2)} vCores. Monitor DTU/vCore usage after change.",
"potential_savings_usd": round(right_size_savings, 2),
"priority": "medium",
})
return savings
def _analyze_storage(self) -> float:
savings = 0.0
accounts = self.resources.get("storage_accounts", [])
for acct in accounts:
size_gb = acct.get("size_gb", 0)
tier = acct.get("tier", "Hot")
# Lifecycle policy missing
if not acct.get("has_lifecycle_policy", False) and size_gb > 50:
lifecycle_savings = size_gb * 0.01 # ~$0.01/GB moving hot to cool
savings += lifecycle_savings
self.recommendations.append({
"service": "Blob Storage",
"type": "Lifecycle Policy",
"issue": f"Account {acct.get('name', '?')} ({size_gb} GB) has no lifecycle policy",
"recommendation": "Add lifecycle management: move to Cool after 30 days, Archive after 90 days.",
"potential_savings_usd": round(lifecycle_savings, 2),
"priority": "medium",
})
# Hot tier for large, infrequently accessed data
if tier == "Hot" and size_gb > 500:
tier_savings = size_gb * 0.008
savings += tier_savings
self.recommendations.append({
"service": "Blob Storage",
"type": "Storage Tier",
"issue": f"Account {acct.get('name', '?')} ({size_gb} GB) on Hot tier",
"recommendation": "Evaluate Cool or Cold tier for infrequently accessed data. Hot=$0.018/GB, Cool=$0.01/GB, Cold=$0.0036/GB.",
"potential_savings_usd": round(tier_savings, 2),
"priority": "high",
})
return savings
def _analyze_aks(self) -> float:
savings = 0.0
clusters = self.resources.get("aks_clusters", [])
for cluster in clusters:
cost = cluster.get("monthly_cost", 500)
cpu = cluster.get("avg_cpu_utilization", 100)
node_count = cluster.get("node_count", 3)
# Over-provisioned cluster
if cpu < 30 and node_count > 3:
aks_savings = cost * 0.3
savings += aks_savings
self.recommendations.append({
"service": "AKS",
"type": "Right-sizing",
"issue": f"Cluster {cluster.get('name', '?')} has {node_count} nodes at {cpu}% CPU",
"recommendation": "Enable cluster autoscaler. Set min nodes to 2 (or 1 for dev). Use node auto-provisioning.",
"potential_savings_usd": round(aks_savings, 2),
"priority": "high",
})
# Spot node pools for non-critical workloads
if not cluster.get("has_spot_pool", False):
spot_savings = cost * 0.15
savings += spot_savings
self.recommendations.append({
"service": "AKS",
"type": "Spot Node Pools",
"issue": f"Cluster {cluster.get('name', '?')} has no spot node pools",
"recommendation": "Add a spot node pool for batch jobs, CI runners, and dev workloads (up to 90% savings).",
"potential_savings_usd": round(spot_savings, 2),
"priority": "medium",
})
return savings
def _analyze_cosmos_db(self) -> float:
savings = 0.0
dbs = self.resources.get("cosmos_db", [])
for db in dbs:
cost = db.get("monthly_cost", 200)
ru_provisioned = db.get("ru_provisioned", 400)
ru_used = db.get("ru_used_avg", 400)
# Massive over-provisioning
if ru_provisioned > 0 and ru_used / ru_provisioned < 0.2:
cosmos_savings = cost * 0.5
savings += cosmos_savings
self.recommendations.append({
"service": "Cosmos DB",
"type": "Right-sizing",
"issue": f"Container {db.get('name', '?')} uses {ru_used}/{ru_provisioned} RU/s ({int(ru_used/ru_provisioned*100)}% utilization)",
"recommendation": "Switch to autoscale throughput or serverless mode. Autoscale adjusts RU/s between 10%-100% of max.",
"potential_savings_usd": round(cosmos_savings, 2),
"priority": "high",
})
elif ru_provisioned > 0 and ru_used / ru_provisioned < 0.5:
cosmos_savings = cost * 0.25
savings += cosmos_savings
self.recommendations.append({
"service": "Cosmos DB",
"type": "Autoscale",
"issue": f"Container {db.get('name', '?')} uses {ru_used}/{ru_provisioned} RU/s — variable workload",
"recommendation": "Enable autoscale throughput. Set max RU/s to current provisioned value.",
"potential_savings_usd": round(cosmos_savings, 2),
"priority": "medium",
})
return savings
def _analyze_app_services(self) -> float:
savings = 0.0
apps = self.resources.get("app_services", [])
for app in apps:
cost = app.get("monthly_cost", 100)
cpu = app.get("cpu_utilization", 100)
instances = app.get("instance_count", 1)
tier = app.get("tier", "Basic")
# Over-provisioned instances
if cpu < 20 and instances > 1:
app_savings = cost * 0.4
savings += app_savings
self.recommendations.append({
"service": "App Service",
"type": "Right-sizing",
"issue": f"App {app.get('name', '?')} runs {instances} instances at {cpu}% CPU",
"recommendation": "Reduce instance count or enable autoscale with min=1. Consider downgrading plan tier.",
"potential_savings_usd": round(app_savings, 2),
"priority": "high",
})
# Premium tier for dev/test
if tier in ("PremiumV3", "PremiumV2") and app.get("environment") in ("dev", "test"):
tier_savings = cost * 0.5
savings += tier_savings
self.recommendations.append({
"service": "App Service",
"type": "Plan Tier",
"issue": f"App {app.get('name', '?')} uses {tier} in {app.get('environment', 'unknown')} environment",
"recommendation": "Use Basic (B1) or Free tier for dev/test environments.",
"potential_savings_usd": round(tier_savings, 2),
"priority": "high",
})
return savings
def _analyze_networking(self) -> float:
savings = 0.0
# Unattached public IPs
pips = self.resources.get("public_ips", [])
unattached = [p for p in pips if not p.get("attached", True)]
if unattached:
pip_savings = len(unattached) * 3.65 # ~$0.005/hr = $3.65/month
savings += pip_savings
self.recommendations.append({
"service": "Public IP",
"type": "Unused Resource",
"issue": f"{len(unattached)} unattached public IPs incurring hourly charges",
"recommendation": "Delete unused public IPs. Unattached Standard SKU IPs cost ~$3.65/month each.",
"potential_savings_usd": round(pip_savings, 2),
"priority": "high",
})
# NAT Gateway in dev environments
nat_gateways = self.resources.get("nat_gateways", [])
dev_nats = [n for n in nat_gateways if n.get("environment") in ("dev", "test")]
if dev_nats:
nat_savings = len(dev_nats) * 32 # ~$32/month per NAT Gateway
savings += nat_savings
self.recommendations.append({
"service": "NAT Gateway",
"type": "Environment Optimization",
"issue": f"{len(dev_nats)} NAT Gateways in dev/test environments",
"recommendation": "Remove NAT Gateways in dev/test. Use Azure Firewall or service tags for outbound instead.",
"potential_savings_usd": round(nat_savings, 2),
"priority": "medium",
})
return savings
def _analyze_general(self) -> float:
savings = 0.0
if not self.resources.get("has_budget_alerts", False):
self.recommendations.append({
"service": "Cost Management",
"type": "Budget Alerts",
"issue": "No budget alerts configured",
"recommendation": "Create Azure Budget with alerts at 50%, 80%, and 100% of monthly target.",
"potential_savings_usd": 0,
"priority": "high",
})
if not self.resources.get("has_advisor_enabled", True):
self.recommendations.append({
"service": "Azure Advisor",
"type": "Visibility",
"issue": "Azure Advisor cost recommendations not reviewed",
"recommendation": "Review Azure Advisor cost recommendations weekly. Enable Advisor alerts for new findings.",
"potential_savings_usd": 0,
"priority": "medium",
})
return savings
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _estimate_current_spend(self) -> float:
total = 0.0
for key in ("virtual_machines", "sql_databases", "aks_clusters", "cosmos_db", "app_services"):
for item in self.resources.get(key, []):
total += item.get("monthly_cost", 0)
# Storage estimate
for acct in self.resources.get("storage_accounts", []):
total += acct.get("size_gb", 0) * 0.018 # Hot tier default
# Public IPs
for pip in self.resources.get("public_ips", []):
total += 3.65
return total if total > 0 else 1000 # Default if no cost data
def _top_priority(self) -> List[Dict[str, Any]]:
high = [r for r in self.recommendations if r["priority"] == "high"]
high.sort(key=lambda x: x.get("potential_savings_usd", 0), reverse=True)
return high[:5]
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _format_text(report: Dict[str, Any]) -> str:
lines = []
lines.append(f"Current Monthly Spend: ${report['current_monthly_usd']}")
lines.append(f"Potential Savings: ${report['potential_monthly_savings_usd']} ({report['savings_percentage']}%)")
lines.append(f"Optimized Spend: ${report['optimized_monthly_usd']}")
lines.append("")
lines.append("=== Priority Actions ===")
for i, action in enumerate(report.get("priority_actions", []), 1):
lines.append(f" {i}. [{action['service']}] {action['recommendation']}")
lines.append(f" Savings: ${action.get('potential_savings_usd', 0)}")
lines.append("")
lines.append("=== All Recommendations ===")
for rec in report.get("recommendations", []):
lines.append(f" [{rec['priority'].upper()}] {rec['service']}{rec['type']}")
lines.append(f" Issue: {rec['issue']}")
lines.append(f" Action: {rec['recommendation']}")
savings = rec.get("potential_savings_usd", 0)
if savings:
lines.append(f" Savings: ${savings}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Azure Cost Optimizer — analyze Azure resources and recommend cost savings.",
epilog="Examples:\n"
" python cost_optimizer.py --config resources.json\n"
" python cost_optimizer.py --config resources.json --json",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--config",
required=True,
help="Path to JSON file with current Azure resource inventory",
)
parser.add_argument(
"--json",
action="store_true",
dest="json_output",
help="Output as JSON instead of human-readable text",
)
args = parser.parse_args()
try:
with open(args.config, "r") as f:
resources = json.load(f)
except FileNotFoundError:
print(f"Error: file not found: {args.config}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as exc:
print(f"Error: invalid JSON in {args.config}: {exc}", file=sys.stderr)
sys.exit(1)
optimizer = AzureCostOptimizer(resources)
report = optimizer.analyze()
if args.json_output:
print(json.dumps(report, indent=2))
else:
print(_format_text(report))
if __name__ == "__main__":
main()