feat(skills): add terraform-patterns agent skill

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Leo
2026-03-15 23:29:01 +01:00
parent 0c31067556
commit dac49ee9f9
6 changed files with 2419 additions and 0 deletions

View File

@@ -0,0 +1,461 @@
#!/usr/bin/env python3
"""
terraform-patterns: Terraform Module Analyzer
Analyze a Terraform directory structure for module quality, resource counts,
naming conventions, and structural best practices. Reports variable/output
coverage, file organization, and actionable recommendations.
Usage:
python scripts/tf_module_analyzer.py ./terraform
python scripts/tf_module_analyzer.py ./terraform --output json
python scripts/tf_module_analyzer.py ./modules/vpc
"""
import argparse
import json
import os
import re
import sys
from pathlib import Path
# --- Demo Terraform Files ---
DEMO_FILES = {
"main.tf": """
resource "aws_instance" "web_server" {
ami = var.ami_id
instance_type = var.instance_type
tags = {
Name = "web-server"
}
}
resource "aws_s3_bucket" "data" {
bucket = "my-data-bucket-12345"
}
resource "aws_security_group" "web" {
name = "web-sg"
ingress {
from_port = 80
to_port = 80
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
ingress {
from_port = 22
to_port = 22
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
}
data "aws_ami" "ubuntu" {
most_recent = true
owners = ["099720109477"]
}
module "vpc" {
source = "./modules/vpc"
cidr = var.vpc_cidr
}
""",
"variables.tf": """
variable "ami_id" {
type = string
}
variable "instance_type" {
default = "t3.micro"
}
variable "vpc_cidr" {
description = "CIDR block for the VPC"
type = string
default = "10.0.0.0/16"
}
variable "environment" {
description = "Deployment environment"
type = string
validation {
condition = contains(["dev", "staging", "prod"], var.environment)
error_message = "Environment must be dev, staging, or prod."
}
}
""",
"outputs.tf": """
output "instance_id" {
value = aws_instance.web_server.id
}
output "bucket_arn" {
value = aws_s3_bucket.data.arn
description = "ARN of the data S3 bucket"
}
""",
}
# --- Naming convention patterns ---
# Terraform resource naming: lowercase, underscores, alphanumeric
VALID_RESOURCE_NAME = re.compile(r'^[a-z][a-z0-9_]*$')
# Expected files in a well-structured module
EXPECTED_FILES = {
"main.tf": "Primary resources",
"variables.tf": "Input variables",
"outputs.tf": "Output values",
"versions.tf": "Provider and Terraform version requirements",
}
OPTIONAL_FILES = {
"locals.tf": "Computed local values",
"data.tf": "Data sources",
"backend.tf": "Remote state backend configuration",
"providers.tf": "Provider configuration",
"README.md": "Module documentation",
}
def find_tf_files(directory):
"""Find all .tf files in a directory (non-recursive)."""
tf_files = {}
for entry in sorted(os.listdir(directory)):
if entry.endswith(".tf"):
filepath = os.path.join(directory, entry)
with open(filepath, encoding="utf-8") as f:
tf_files[entry] = f.read()
return tf_files
def parse_resources(content):
"""Extract resource declarations from HCL content."""
resources = []
for match in re.finditer(
r'^resource\s+"([^"]+)"\s+"([^"]+)"', content, re.MULTILINE
):
resources.append({
"type": match.group(1),
"name": match.group(2),
"provider": match.group(1).split("_")[0],
})
return resources
def parse_data_sources(content):
"""Extract data source declarations."""
sources = []
for match in re.finditer(
r'^data\s+"([^"]+)"\s+"([^"]+)"', content, re.MULTILINE
):
sources.append({"type": match.group(1), "name": match.group(2)})
return sources
def parse_variables(content):
"""Extract variable declarations with metadata."""
variables = []
# Match variable blocks
for match in re.finditer(
r'^variable\s+"([^"]+)"\s*\{(.*?)\n\}',
content,
re.MULTILINE | re.DOTALL,
):
name = match.group(1)
body = match.group(2)
var = {
"name": name,
"has_description": "description" in body,
"has_type": bool(re.search(r'\btype\s*=', body)),
"has_default": bool(re.search(r'\bdefault\s*=', body)),
"has_validation": "validation" in body,
"is_sensitive": "sensitive" in body and bool(
re.search(r'\bsensitive\s*=\s*true', body)
),
}
variables.append(var)
return variables
def parse_outputs(content):
"""Extract output declarations with metadata."""
outputs = []
for match in re.finditer(
r'^output\s+"([^"]+)"\s*\{(.*?)\n\}',
content,
re.MULTILINE | re.DOTALL,
):
name = match.group(1)
body = match.group(2)
out = {
"name": name,
"has_description": "description" in body,
"is_sensitive": "sensitive" in body and bool(
re.search(r'\bsensitive\s*=\s*true', body)
),
}
outputs.append(out)
return outputs
def parse_modules(content):
"""Extract module calls."""
modules = []
for match in re.finditer(
r'^module\s+"([^"]+)"\s*\{(.*?)\n\}',
content,
re.MULTILINE | re.DOTALL,
):
name = match.group(1)
body = match.group(2)
source_match = re.search(r'source\s*=\s*"([^"]+)"', body)
source = source_match.group(1) if source_match else "unknown"
modules.append({"name": name, "source": source})
return modules
def check_naming(resources, data_sources):
"""Check naming conventions."""
issues = []
for r in resources:
if not VALID_RESOURCE_NAME.match(r["name"]):
issues.append({
"severity": "medium",
"message": f"Resource '{r['type']}.{r['name']}' uses non-standard naming — use lowercase with underscores",
})
if r["name"].startswith(r["provider"] + "_"):
issues.append({
"severity": "low",
"message": f"Resource '{r['type']}.{r['name']}' name repeats the provider prefix — redundant",
})
for d in data_sources:
if not VALID_RESOURCE_NAME.match(d["name"]):
issues.append({
"severity": "medium",
"message": f"Data source '{d['type']}.{d['name']}' uses non-standard naming",
})
return issues
def check_variables(variables):
"""Check variable quality."""
issues = []
for v in variables:
if not v["has_description"]:
issues.append({
"severity": "medium",
"message": f"Variable '{v['name']}' missing description — consumers won't know what to provide",
})
if not v["has_type"]:
issues.append({
"severity": "high",
"message": f"Variable '{v['name']}' missing type constraint — accepts any value",
})
# Check if name suggests a secret
secret_patterns = ["password", "secret", "token", "key", "api_key", "credentials"]
name_lower = v["name"].lower()
if any(p in name_lower for p in secret_patterns) and not v["is_sensitive"]:
issues.append({
"severity": "high",
"message": f"Variable '{v['name']}' looks like a secret but is not marked sensitive = true",
})
return issues
def check_outputs(outputs):
"""Check output quality."""
issues = []
for o in outputs:
if not o["has_description"]:
issues.append({
"severity": "low",
"message": f"Output '{o['name']}' missing description",
})
return issues
def check_file_structure(tf_files):
"""Check if expected files are present."""
issues = []
filenames = set(tf_files.keys())
for expected, purpose in EXPECTED_FILES.items():
if expected not in filenames:
issues.append({
"severity": "medium" if expected != "versions.tf" else "high",
"message": f"Missing '{expected}'{purpose}",
})
return issues
def analyze_directory(tf_files):
"""Run full analysis on a set of .tf files."""
all_content = "\n".join(tf_files.values())
resources = parse_resources(all_content)
data_sources = parse_data_sources(all_content)
variables = parse_variables(all_content)
outputs = parse_outputs(all_content)
modules = parse_modules(all_content)
# Collect findings
findings = []
findings.extend(check_file_structure(tf_files))
findings.extend(check_naming(resources, data_sources))
findings.extend(check_variables(variables))
findings.extend(check_outputs(outputs))
# Check for backend configuration
has_backend = any(
re.search(r'\bbackend\s+"', content)
for content in tf_files.values()
)
if not has_backend:
findings.append({
"severity": "high",
"message": "No remote backend configured — state is stored locally",
})
# Check for terraform required_version
has_tf_version = any(
re.search(r'required_version\s*=', content)
for content in tf_files.values()
)
if not has_tf_version:
findings.append({
"severity": "medium",
"message": "No required_version constraint — any Terraform version can be used",
})
# Providers in child modules check
for filename, content in tf_files.items():
if filename not in ("providers.tf", "versions.tf", "backend.tf"):
if re.search(r'^provider\s+"', content, re.MULTILINE):
findings.append({
"severity": "medium",
"message": f"Provider configuration found in '{filename}' — keep providers in root module only",
})
# Sort findings
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
findings.sort(key=lambda f: severity_order.get(f["severity"], 4))
# Unique providers
providers = sorted(set(r["provider"] for r in resources))
return {
"files": sorted(tf_files.keys()),
"file_count": len(tf_files),
"resources": resources,
"resource_count": len(resources),
"data_sources": data_sources,
"data_source_count": len(data_sources),
"variables": variables,
"variable_count": len(variables),
"outputs": outputs,
"output_count": len(outputs),
"modules": modules,
"module_count": len(modules),
"providers": providers,
"findings": findings,
}
def generate_report(analysis, output_format="text"):
"""Generate analysis report."""
findings = analysis["findings"]
# Score
deductions = {"critical": 25, "high": 15, "medium": 5, "low": 2}
score = max(0, 100 - sum(deductions.get(f["severity"], 0) for f in findings))
counts = {
"critical": sum(1 for f in findings if f["severity"] == "critical"),
"high": sum(1 for f in findings if f["severity"] == "high"),
"medium": sum(1 for f in findings if f["severity"] == "medium"),
"low": sum(1 for f in findings if f["severity"] == "low"),
}
result = {
"score": score,
"files": analysis["files"],
"resource_count": analysis["resource_count"],
"data_source_count": analysis["data_source_count"],
"variable_count": analysis["variable_count"],
"output_count": analysis["output_count"],
"module_count": analysis["module_count"],
"providers": analysis["providers"],
"findings": findings,
"finding_counts": counts,
}
if output_format == "json":
print(json.dumps(result, indent=2))
return result
# Text output
print(f"\n{'=' * 60}")
print(f" Terraform Module Analysis Report")
print(f"{'=' * 60}")
print(f" Score: {score}/100")
print(f" Files: {', '.join(analysis['files'])}")
print(f" Providers: {', '.join(analysis['providers']) if analysis['providers'] else 'none detected'}")
print()
print(f" Resources: {analysis['resource_count']} | Data Sources: {analysis['data_source_count']}")
print(f" Variables: {analysis['variable_count']} | Outputs: {analysis['output_count']} | Modules: {analysis['module_count']}")
print()
print(f" Findings: {counts['critical']} critical | {counts['high']} high | {counts['medium']} medium | {counts['low']} low")
print(f"{'' * 60}")
for f in findings:
icon = {"critical": "!!!", "high": "!!", "medium": "!", "low": "~"}.get(f["severity"], "?")
print(f"\n {icon} {f['severity'].upper()}")
print(f" {f['message']}")
if not findings:
print("\n No issues found. Module structure looks good.")
print(f"\n{'=' * 60}\n")
return result
def main():
parser = argparse.ArgumentParser(
description="terraform-patterns: Terraform module analyzer"
)
parser.add_argument(
"directory", nargs="?",
help="Path to Terraform directory (omit for demo)",
)
parser.add_argument(
"--output", "-o",
choices=["text", "json"],
default="text",
help="Output format (default: text)",
)
args = parser.parse_args()
if args.directory:
dirpath = Path(args.directory)
if not dirpath.is_dir():
print(f"Error: Not a directory: {args.directory}", file=sys.stderr)
sys.exit(1)
tf_files = find_tf_files(str(dirpath))
if not tf_files:
print(f"Error: No .tf files found in {args.directory}", file=sys.stderr)
sys.exit(1)
else:
print("No directory provided. Running demo analysis...\n")
tf_files = DEMO_FILES
analysis = analyze_directory(tf_files)
generate_report(analysis, args.output)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,577 @@
#!/usr/bin/env python3
"""
terraform-patterns: Terraform Security Scanner
Scan .tf files for common security issues including hardcoded secrets,
overly permissive IAM policies, open security groups, missing encryption,
and sensitive variable misuse.
Usage:
python scripts/tf_security_scanner.py ./terraform
python scripts/tf_security_scanner.py ./terraform --output json
python scripts/tf_security_scanner.py ./terraform --strict
"""
import argparse
import json
import os
import re
import sys
from pathlib import Path
# --- Demo Terraform File ---
DEMO_TF = """
provider "aws" {
region = "us-east-1"
access_key = "AKIAIOSFODNN7EXAMPLE"
secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
}
variable "db_password" {
type = string
default = "supersecret123"
}
resource "aws_instance" "web" {
ami = "ami-12345678"
instance_type = "t3.micro"
tags = {
Name = "web-server"
}
}
resource "aws_security_group" "web" {
name = "web-sg"
ingress {
from_port = 22
to_port = 22
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
ingress {
from_port = 0
to_port = 65535
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}
resource "aws_iam_policy" "admin" {
name = "admin-policy"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = "*"
Resource = "*"
}
]
})
}
resource "aws_s3_bucket" "data" {
bucket = "my-data-bucket"
}
resource "aws_db_instance" "main" {
engine = "mysql"
instance_class = "db.t3.micro"
password = "hardcoded-password"
publicly_accessible = true
skip_final_snapshot = true
}
"""
# --- Security Rules ---
SECRET_PATTERNS = [
{
"id": "SEC001",
"name": "aws_access_key",
"severity": "critical",
"pattern": r'(?:access_key|aws_access_key_id)\s*=\s*"(AKIA[A-Z0-9]{16})"',
"message": "AWS access key hardcoded in configuration",
"fix": "Use environment variables, AWS profiles, or IAM roles instead",
},
{
"id": "SEC002",
"name": "aws_secret_key",
"severity": "critical",
"pattern": r'(?:secret_key|aws_secret_access_key)\s*=\s*"[A-Za-z0-9/+=]{40}"',
"message": "AWS secret key hardcoded in configuration",
"fix": "Use environment variables, AWS profiles, or IAM roles instead",
},
{
"id": "SEC003",
"name": "generic_password",
"severity": "critical",
"pattern": r'(?:password|passwd)\s*=\s*"[^"]{4,}"',
"message": "Password hardcoded in resource or provider configuration",
"fix": "Use a variable with sensitive = true, or fetch from Vault/SSM/Secrets Manager",
},
{
"id": "SEC004",
"name": "generic_secret",
"severity": "critical",
"pattern": r'(?:secret|token|api_key)\s*=\s*"[^"]{8,}"',
"message": "Secret or token hardcoded in configuration",
"fix": "Use a sensitive variable or secrets manager",
},
{
"id": "SEC005",
"name": "private_key",
"severity": "critical",
"pattern": r'-----BEGIN (?:RSA |EC |DSA )?PRIVATE KEY-----',
"message": "Private key embedded in Terraform configuration",
"fix": "Reference key file with file() function or use secrets manager",
},
]
IAM_PATTERNS = [
{
"id": "SEC010",
"name": "iam_wildcard_action",
"severity": "critical",
"pattern": r'Action\s*=\s*"\*"',
"message": "IAM policy with wildcard Action = \"*\" — grants all permissions",
"fix": "Scope Action to specific services and operations",
},
{
"id": "SEC011",
"name": "iam_wildcard_resource",
"severity": "high",
"pattern": r'Resource\s*=\s*"\*"',
"message": "IAM policy with wildcard Resource = \"*\" — applies to all resources",
"fix": "Scope Resource to specific ARN patterns",
},
{
"id": "SEC012",
"name": "iam_star_star",
"severity": "critical",
"pattern": r'Action\s*=\s*"\*"[^}]*Resource\s*=\s*"\*"',
"message": "IAM policy with Action=* AND Resource=* — effectively admin access",
"fix": "Follow least-privilege: grant only the specific actions and resources needed",
},
]
NETWORK_PATTERNS = [
{
"id": "SEC020",
"name": "sg_ssh_open",
"severity": "critical",
"pattern": None, # Custom check
"message": "Security group allows SSH (port 22) from 0.0.0.0/0",
"fix": "Restrict to known CIDR blocks, or use SSM Session Manager instead",
},
{
"id": "SEC021",
"name": "sg_rdp_open",
"severity": "critical",
"pattern": None, # Custom check
"message": "Security group allows RDP (port 3389) from 0.0.0.0/0",
"fix": "Restrict to known CIDR blocks, or use a bastion host",
},
{
"id": "SEC022",
"name": "sg_all_ports",
"severity": "critical",
"pattern": None, # Custom check
"message": "Security group allows all ports (0-65535) from 0.0.0.0/0",
"fix": "Open only the specific ports your application needs",
},
]
ENCRYPTION_PATTERNS = [
{
"id": "SEC030",
"name": "s3_no_encryption",
"severity": "high",
"pattern": None, # Custom check
"message": "S3 bucket without server-side encryption configuration",
"fix": "Add aws_s3_bucket_server_side_encryption_configuration resource",
},
{
"id": "SEC031",
"name": "rds_no_encryption",
"severity": "high",
"pattern": None, # Custom check
"message": "RDS instance without storage encryption",
"fix": "Set storage_encrypted = true on aws_db_instance",
},
{
"id": "SEC032",
"name": "ebs_no_encryption",
"severity": "medium",
"pattern": None, # Custom check
"message": "EBS volume without encryption",
"fix": "Set encrypted = true on aws_ebs_volume or enable account-level default encryption",
},
]
ACCESS_PATTERNS = [
{
"id": "SEC040",
"name": "rds_public",
"severity": "high",
"pattern": r'publicly_accessible\s*=\s*true',
"message": "RDS instance is publicly accessible",
"fix": "Set publicly_accessible = false and access via VPC/bastion",
},
{
"id": "SEC041",
"name": "s3_public_acl",
"severity": "high",
"pattern": r'acl\s*=\s*"public-read(?:-write)?"',
"message": "S3 bucket with public ACL",
"fix": "Remove public ACL and add aws_s3_bucket_public_access_block",
},
]
def find_tf_files(directory):
"""Find all .tf files in a directory (non-recursive)."""
tf_files = {}
for entry in sorted(os.listdir(directory)):
if entry.endswith(".tf"):
filepath = os.path.join(directory, entry)
with open(filepath, encoding="utf-8") as f:
tf_files[entry] = f.read()
return tf_files
def check_regex_rules(content, rules):
"""Run regex-based security rules against content."""
findings = []
for rule in rules:
if rule["pattern"] is None:
continue
for match in re.finditer(rule["pattern"], content, re.MULTILINE | re.IGNORECASE):
findings.append({
"id": rule["id"],
"severity": rule["severity"],
"message": rule["message"],
"fix": rule["fix"],
"line": match.group(0).strip()[:80],
})
return findings
def check_security_groups(content):
"""Custom check for open security groups."""
findings = []
# Parse ingress blocks within security group resources
sg_blocks = re.finditer(
r'resource\s+"aws_security_group"[^{]*\{(.*?)\n\}',
content,
re.DOTALL,
)
for sg_match in sg_blocks:
sg_body = sg_match.group(1)
ingress_blocks = re.finditer(
r'ingress\s*\{(.*?)\}', sg_body, re.DOTALL
)
for ingress in ingress_blocks:
block = ingress.group(1)
has_open_cidr = '0.0.0.0/0' in block or '::/0' in block
if not has_open_cidr:
continue
from_port_match = re.search(r'from_port\s*=\s*(\d+)', block)
to_port_match = re.search(r'to_port\s*=\s*(\d+)', block)
if from_port_match and to_port_match:
from_port = int(from_port_match.group(1))
to_port = int(to_port_match.group(1))
# SSH open
if from_port <= 22 <= to_port:
rule = next(r for r in NETWORK_PATTERNS if r["id"] == "SEC020")
findings.append({
"id": rule["id"],
"severity": rule["severity"],
"message": rule["message"],
"fix": rule["fix"],
"line": f"ingress port 22, cidr 0.0.0.0/0",
})
# RDP open
if from_port <= 3389 <= to_port:
rule = next(r for r in NETWORK_PATTERNS if r["id"] == "SEC021")
findings.append({
"id": rule["id"],
"severity": rule["severity"],
"message": rule["message"],
"fix": rule["fix"],
"line": f"ingress port 3389, cidr 0.0.0.0/0",
})
# All ports open
if from_port == 0 and to_port >= 65535:
rule = next(r for r in NETWORK_PATTERNS if r["id"] == "SEC022")
findings.append({
"id": rule["id"],
"severity": rule["severity"],
"message": rule["message"],
"fix": rule["fix"],
"line": f"ingress ports 0-65535, cidr 0.0.0.0/0",
})
return findings
def check_encryption(content):
"""Custom check for missing encryption on storage resources."""
findings = []
# S3 buckets without encryption
s3_buckets = re.findall(
r'resource\s+"aws_s3_bucket"\s+"([^"]+)"', content
)
s3_encryption = re.findall(
r'resource\s+"aws_s3_bucket_server_side_encryption_configuration"', content
)
# Also check inline encryption (older format)
inline_encryption = re.findall(
r'server_side_encryption_configuration', content
)
if s3_buckets and not s3_encryption and not inline_encryption:
rule = next(r for r in ENCRYPTION_PATTERNS if r["id"] == "SEC030")
for bucket in s3_buckets:
findings.append({
"id": rule["id"],
"severity": rule["severity"],
"message": f"{rule['message']} (bucket: {bucket})",
"fix": rule["fix"],
"line": f'aws_s3_bucket.{bucket}',
})
# RDS without encryption
rds_blocks = re.finditer(
r'resource\s+"aws_db_instance"\s+"([^"]+)"\s*\{(.*?)\n\}',
content,
re.DOTALL,
)
for rds_match in rds_blocks:
name = rds_match.group(1)
body = rds_match.group(2)
if 'storage_encrypted' not in body or re.search(
r'storage_encrypted\s*=\s*false', body
):
rule = next(r for r in ENCRYPTION_PATTERNS if r["id"] == "SEC031")
findings.append({
"id": rule["id"],
"severity": rule["severity"],
"message": f"{rule['message']} (instance: {name})",
"fix": rule["fix"],
"line": f'aws_db_instance.{name}',
})
# EBS volumes without encryption
ebs_blocks = re.finditer(
r'resource\s+"aws_ebs_volume"\s+"([^"]+)"\s*\{(.*?)\n\}',
content,
re.DOTALL,
)
for ebs_match in ebs_blocks:
name = ebs_match.group(1)
body = ebs_match.group(2)
if 'encrypted' not in body or re.search(
r'encrypted\s*=\s*false', body
):
rule = next(r for r in ENCRYPTION_PATTERNS if r["id"] == "SEC032")
findings.append({
"id": rule["id"],
"severity": rule["severity"],
"message": f"{rule['message']} (volume: {name})",
"fix": rule["fix"],
"line": f'aws_ebs_volume.{name}',
})
return findings
def check_sensitive_variables(content):
"""Check if variables that look like secrets are marked sensitive."""
findings = []
var_blocks = re.finditer(
r'variable\s+"([^"]+)"\s*\{(.*?)\n\}',
content,
re.DOTALL,
)
secret_names = ["password", "secret", "token", "api_key", "private_key", "credentials"]
for var_match in var_blocks:
name = var_match.group(1)
body = var_match.group(2)
name_lower = name.lower()
if any(s in name_lower for s in secret_names):
if not re.search(r'sensitive\s*=\s*true', body):
findings.append({
"id": "SEC050",
"severity": "medium",
"message": f"Variable '{name}' appears to be a secret but is not marked sensitive = true",
"fix": "Add sensitive = true to prevent the value from appearing in logs and plan output",
"line": f'variable "{name}"',
})
# Check for hardcoded default
default_match = re.search(r'default\s*=\s*"([^"]+)"', body)
if default_match and len(default_match.group(1)) > 0:
findings.append({
"id": "SEC051",
"severity": "critical",
"message": f"Variable '{name}' has a hardcoded default value for a secret",
"fix": "Remove the default value — require it to be passed at runtime via tfvars or env",
"line": f'variable "{name}" default = "{default_match.group(1)[:20]}..."',
})
return findings
def scan_content(content, strict=False):
"""Run all security checks on content."""
findings = []
findings.extend(check_regex_rules(content, SECRET_PATTERNS))
findings.extend(check_regex_rules(content, IAM_PATTERNS))
findings.extend(check_regex_rules(content, ACCESS_PATTERNS))
findings.extend(check_security_groups(content))
findings.extend(check_encryption(content))
findings.extend(check_sensitive_variables(content))
if strict:
for f in findings:
if f["severity"] == "medium":
f["severity"] = "high"
elif f["severity"] == "low":
f["severity"] = "medium"
# Deduplicate by (id, line)
seen = set()
unique = []
for f in findings:
key = (f["id"], f.get("line", ""))
if key not in seen:
seen.add(key)
unique.append(f)
findings = unique
# Sort by severity
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
findings.sort(key=lambda f: severity_order.get(f["severity"], 4))
return findings
def generate_report(content, output_format="text", strict=False):
"""Generate security scan report."""
findings = scan_content(content, strict)
# Score
deductions = {"critical": 25, "high": 15, "medium": 5, "low": 2}
score = max(0, 100 - sum(deductions.get(f["severity"], 0) for f in findings))
counts = {
"critical": sum(1 for f in findings if f["severity"] == "critical"),
"high": sum(1 for f in findings if f["severity"] == "high"),
"medium": sum(1 for f in findings if f["severity"] == "medium"),
"low": sum(1 for f in findings if f["severity"] == "low"),
}
result = {
"score": score,
"findings": findings,
"finding_counts": counts,
"total_findings": len(findings),
}
if output_format == "json":
print(json.dumps(result, indent=2))
return result
# Text output
print(f"\n{'=' * 60}")
print(f" Terraform Security Scan Report")
print(f"{'=' * 60}")
print(f" Score: {score}/100")
print()
print(f" Findings: {counts['critical']} critical | {counts['high']} high | {counts['medium']} medium | {counts['low']} low")
print(f"{'' * 60}")
for f in findings:
icon = {"critical": "!!!", "high": "!!", "medium": "!", "low": "~"}.get(f["severity"], "?")
print(f"\n [{f['id']}] {icon} {f['severity'].upper()}")
print(f" {f['message']}")
if f.get("line"):
print(f" Match: {f['line']}")
print(f" Fix: {f['fix']}")
if not findings:
print("\n No security issues found. Configuration looks clean.")
print(f"\n{'=' * 60}\n")
return result
def main():
parser = argparse.ArgumentParser(
description="terraform-patterns: Terraform security scanner"
)
parser.add_argument(
"target", nargs="?",
help="Path to Terraform directory or .tf file (omit for demo)",
)
parser.add_argument(
"--output", "-o",
choices=["text", "json"],
default="text",
help="Output format (default: text)",
)
parser.add_argument(
"--strict",
action="store_true",
help="Strict mode — elevate warnings to higher severity",
)
args = parser.parse_args()
if args.target:
target = Path(args.target)
if target.is_dir():
tf_files = find_tf_files(str(target))
if not tf_files:
print(f"Error: No .tf files found in {args.target}", file=sys.stderr)
sys.exit(1)
content = "\n".join(tf_files.values())
elif target.is_file() and target.suffix == ".tf":
content = target.read_text(encoding="utf-8")
else:
print(f"Error: {args.target} is not a directory or .tf file", file=sys.stderr)
sys.exit(1)
else:
print("No target provided. Running demo scan...\n")
content = DEMO_TF
generate_report(content, args.output, args.strict)
if __name__ == "__main__":
main()