Files
claude-skills-reference/engineering-team/cloud-security/scripts/cloud_posture_check.py
Jaskarn Singh d2da9d3dad feat(engineering-team): add 5 consolidated security skills
Adds threat-detection, incident-response, cloud-security, red-team, and ai-security skills to engineering-team. Each includes SKILL.md, references, and Python scripts (stdlib-only). Consolidation of 66 individual skills into 5 production-ready packages.
2026-03-30 21:07:43 +02:00

1181 lines
42 KiB
Python

#!/usr/bin/env python3
"""
cloud_posture_check.py — Cloud Security Posture Check
Analyses IAM policies and cloud resource configurations for privilege
escalation paths, data exfiltration risks, public exposure, S3 bucket
misconfigurations, and Security Group dangerous inbound rules.
Supports AWS (full), with Azure/GCP stubs for future expansion.
Usage:
python3 cloud_posture_check.py policy.json
python3 cloud_posture_check.py policy.json --check privilege-escalation --json
python3 cloud_posture_check.py sg.json --check sg --provider aws --json
python3 cloud_posture_check.py bucket.json --check s3 --severity-modifier internet-facing
Exit codes:
0 No findings or informational only
1 High-severity findings present
2 Critical findings present
"""
import argparse
import json
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
# ---------------------------------------------------------------------------
# IAM Analysis Constants (from analyze_iam_policy.py base)
# ---------------------------------------------------------------------------
PRIVILEGE_ESCALATION_ACTIONS: List[str] = [
"iam:CreatePolicyVersion",
"iam:SetDefaultPolicyVersion",
"iam:PassRole",
"iam:CreateAccessKey",
"iam:CreateLoginProfile",
"iam:UpdateLoginProfile",
"iam:AttachUserPolicy",
"iam:AttachGroupPolicy",
"iam:AttachRolePolicy",
"iam:PutUserPolicy",
"iam:PutGroupPolicy",
"iam:PutRolePolicy",
"iam:AddUserToGroup",
"iam:UpdateAssumeRolePolicy",
"sts:AssumeRole",
"iam:CreateRole",
"iam:DeletePolicyVersion",
"iam:CreateUser",
"iam:UpdateAccessKey",
"iam:DeactivateMFADevice",
"iam:DeleteVirtualMFADevice",
"iam:ResyncMFADevice",
"iam:EnableMFADevice",
"iam:DeleteUserPermissionsBoundary",
"iam:DeleteRolePermissionsBoundary",
"lambda:CreateFunction",
"lambda:InvokeFunction",
"lambda:UpdateFunctionCode",
"lambda:AddPermission",
"ec2:RunInstances",
"ec2:AssociateIamInstanceProfile",
"ec2:ReplaceIamInstanceProfileAssociation",
"cloudformation:CreateStack",
"cloudformation:UpdateStack",
"datapipeline:CreatePipeline",
"datapipeline:PutPipelineDefinition",
"glue:CreateDevEndpoint",
"glue:UpdateDevEndpoint",
"codestar:CreateProject",
"codecommit:CreateRepository",
"ssm:SendCommand",
"ssm:StartSession",
]
ESCALATION_COMBOS: List[Dict[str, Any]] = [
{
"name": "PassRole + Lambda Invoke",
"actions": ["iam:PassRole", "lambda:InvokeFunction"],
"description": "Attacker can pass a privileged role to a Lambda function and invoke it",
"severity": "critical",
},
{
"name": "PassRole + EC2 RunInstances",
"actions": ["iam:PassRole", "ec2:RunInstances"],
"description": "Attacker can launch an EC2 instance with a privileged IAM role",
"severity": "critical",
},
{
"name": "CreatePolicyVersion + SetDefaultPolicyVersion",
"actions": ["iam:CreatePolicyVersion", "iam:SetDefaultPolicyVersion"],
"description": "Attacker can create and activate a new policy version granting full access",
"severity": "critical",
},
{
"name": "AttachUserPolicy + AdministratorAccess",
"actions": ["iam:AttachUserPolicy"],
"description": "Can attach any managed policy including AdministratorAccess to users",
"severity": "high",
},
{
"name": "PutUserPolicy + Wildcard",
"actions": ["iam:PutUserPolicy"],
"description": "Can inject inline policies with wildcard permissions",
"severity": "high",
},
{
"name": "CloudFormation Stack Manipulation",
"actions": ["cloudformation:CreateStack", "iam:PassRole"],
"description": "Attacker can deploy a CloudFormation stack with a privileged role",
"severity": "critical",
},
{
"name": "SSM Session Start",
"actions": ["ssm:StartSession"],
"description": "Can start interactive sessions on EC2 instances without SSH",
"severity": "high",
},
{
"name": "Glue Dev Endpoint",
"actions": ["glue:CreateDevEndpoint", "iam:PassRole"],
"description": "Can create a Glue dev endpoint with a privileged role for code execution",
"severity": "critical",
},
]
DATA_EXFILTRATION_ACTIONS: List[str] = [
"s3:GetObject",
"s3:ListBucket",
"s3:GetBucketAcl",
"s3:GetObjectAcl",
"s3:GetBucketPolicy",
"s3:PutBucketPolicy",
"s3:PutBucketAcl",
"s3:PutObjectAcl",
"s3:CopyObject",
"s3:HeadObject",
"rds:DescribeDBInstances",
"rds:DownloadDBLogFilePortion",
"rds:DescribeDBSnapshots",
"rds:RestoreDBInstanceFromDBSnapshot",
"dynamodb:Scan",
"dynamodb:Query",
"dynamodb:GetItem",
"dynamodb:BatchGetItem",
"ec2:DescribeInstances",
"ec2:DescribeSnapshots",
"ec2:CreateSnapshot",
"ec2:ModifySnapshotAttribute",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage",
"secretsmanager:GetSecretValue",
"secretsmanager:ListSecrets",
"ssm:GetParameter",
"ssm:GetParameters",
"ssm:GetParametersByPath",
"kms:Decrypt",
"kms:GenerateDataKey",
"lambda:GetFunction",
"codecommit:GitPull",
"cloudtrail:StopLogging",
"cloudtrail:DeleteTrail",
"guardduty:DeleteDetector",
"logs:DeleteLogGroup",
"logs:DeleteLogStream",
]
# ---------------------------------------------------------------------------
# Data Classes
# ---------------------------------------------------------------------------
@dataclass
class IAMFinding:
"""Represents a single IAM or cloud posture finding."""
finding_id: str
category: str # privilege-escalation | data-exfil | public-exposure | s3 | sg
severity: str # critical | high | medium | low | informational
title: str
description: str
affected_actions: List[str] = field(default_factory=list)
affected_resource: str = "*"
recommendation: str = ""
mitre_technique: str = ""
@dataclass
class IAMAnalysisResult:
"""Aggregated result of an IAM / posture analysis run."""
source: str
check_mode: str
provider: str
severity_modifier: str
findings: List[IAMFinding] = field(default_factory=list)
summary: Dict[str, Any] = field(default_factory=dict)
timestamp_utc: str = ""
def __post_init__(self) -> None:
if not self.timestamp_utc:
self.timestamp_utc = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
@property
def critical_count(self) -> int:
return sum(1 for f in self.findings if f.severity == "critical")
@property
def high_count(self) -> int:
return sum(1 for f in self.findings if f.severity == "high")
@property
def medium_count(self) -> int:
return sum(1 for f in self.findings if f.severity == "medium")
@property
def low_count(self) -> int:
return sum(1 for f in self.findings if f.severity == "low")
# ---------------------------------------------------------------------------
# Severity Bump Utility
# ---------------------------------------------------------------------------
_SEV_LADDER = ["informational", "low", "medium", "high", "critical"]
def _bump_severity(severity: str, modifier: str) -> str:
"""
Bump severity up one band when modifier is internet-facing or regulated-data.
low -> medium -> high -> critical (caps at critical).
"""
if modifier not in ("internet-facing", "regulated-data"):
return severity
try:
idx = _SEV_LADDER.index(severity.lower())
return _SEV_LADDER[min(idx + 1, len(_SEV_LADDER) - 1)]
except ValueError:
return severity
# ---------------------------------------------------------------------------
# Core IAM Analysis Functions (from analyze_iam_policy.py base)
# ---------------------------------------------------------------------------
def _extract_actions(statement: dict) -> List[str]:
"""Normalise Action field to a list of lowercase strings."""
action_field = statement.get("Action") or statement.get("action") or []
if isinstance(action_field, str):
return [action_field.lower()]
return [str(a).lower() for a in action_field]
def _extract_resources(statement: dict) -> List[str]:
"""Normalise Resource field to a list of strings."""
resource_field = (
statement.get("Resource")
or statement.get("resource")
or ["*"]
)
if isinstance(resource_field, str):
return [resource_field]
return [str(r) for r in resource_field]
def _extract_principal(statement: dict) -> str:
"""Return a string representation of the Principal."""
principal = statement.get("Principal") or statement.get("principal") or "N/A"
if isinstance(principal, dict):
parts = []
for k, v in principal.items():
if isinstance(v, list):
parts.append(f"{k}:{','.join(v)}")
else:
parts.append(f"{k}:{v}")
return " | ".join(parts)
return str(principal)
def _is_allow(statement: dict) -> bool:
effect = str(statement.get("Effect") or statement.get("effect") or "Allow")
return effect.strip().lower() == "allow"
def analyze_statement(
statement: dict,
check_mode: str,
finding_prefix: str,
severity_modifier: str,
) -> List[IAMFinding]:
"""
Analyse a single IAM policy statement for risks.
Args:
statement: Parsed IAM statement dict.
check_mode: One of privilege-escalation | data-exfil | public-exposure.
finding_prefix: Short string used to prefix finding IDs.
severity_modifier: internet-facing | regulated-data | none.
Returns:
List of IAMFinding objects (may be empty).
"""
findings: List[IAMFinding] = []
if not _is_allow(statement):
return findings
actions = _extract_actions(statement)
resources = _extract_resources(statement)
principal = _extract_principal(statement)
resource_str = ", ".join(resources[:3]) + ("..." if len(resources) > 3 else "")
wildcard_resource = any(r in ("*", "arn:aws:*") for r in resources)
wildcard_action = any(a in ("*", "iam:*", "s3:*", "ec2:*") for a in actions)
if check_mode == "privilege-escalation":
# Check individual high-risk actions
matched_privesc = [
a for a in actions
if a in [p.lower() for p in PRIVILEGE_ESCALATION_ACTIONS]
]
if matched_privesc:
severity = "high" if not wildcard_resource else "critical"
severity = _bump_severity(severity, severity_modifier)
findings.append(IAMFinding(
finding_id=f"{finding_prefix}-PRIVESC-{len(findings) + 1:03d}",
category="privilege-escalation",
severity=severity,
title="Privilege Escalation Actions Detected",
description=(
f"Statement grants {len(matched_privesc)} privilege escalation "
f"action(s) to principal '{principal}' on resources: {resource_str}."
),
affected_actions=matched_privesc,
affected_resource=resource_str,
recommendation=(
"Apply least-privilege: restrict IAM mutation actions to specific "
"resource ARNs and add Condition constraints. Consider permission boundaries."
),
mitre_technique="T1098",
))
# Check dangerous combos
for combo in ESCALATION_COMBOS:
combo_actions_lower = [c.lower() for c in combo["actions"]]
if all(ca in actions for ca in combo_actions_lower):
combo_sev = _bump_severity(combo["severity"], severity_modifier)
findings.append(IAMFinding(
finding_id=f"{finding_prefix}-COMBO-{len(findings) + 1:03d}",
category="privilege-escalation",
severity=combo_sev,
title=f"Escalation Combo: {combo['name']}",
description=combo["description"],
affected_actions=combo["actions"],
affected_resource=resource_str,
recommendation=(
f"Remove or scope one of the combo actions. "
f"Separate {combo['name']} permissions across different roles."
),
mitre_technique="T1548",
))
# Wildcard action with Allow
if wildcard_action:
sev = _bump_severity("critical", severity_modifier)
findings.append(IAMFinding(
finding_id=f"{finding_prefix}-WILD-{len(findings) + 1:03d}",
category="privilege-escalation",
severity=sev,
title="Wildcard Action Grant",
description=(
f"Statement uses wildcard action(s) {[a for a in actions if '*' in a]} "
f"for principal '{principal}'. This grants unrestricted access."
),
affected_actions=[a for a in actions if "*" in a],
affected_resource=resource_str,
recommendation="Replace wildcard actions with an explicit allowlist of required actions.",
mitre_technique="T1078.004",
))
elif check_mode == "data-exfil":
matched_exfil = [
a for a in actions
if a in [d.lower() for d in DATA_EXFILTRATION_ACTIONS]
]
if matched_exfil:
severity = "medium"
if wildcard_resource:
severity = "high"
# Particularly dangerous: log deletion or trail stopping
disruptive = [
a for a in matched_exfil
if any(x in a for x in ["stoplog", "deletelog", "deletetrail", "deletedetector"])
]
if disruptive:
severity = "critical"
severity = _bump_severity(severity, severity_modifier)
findings.append(IAMFinding(
finding_id=f"{finding_prefix}-EXFIL-{len(findings) + 1:03d}",
category="data-exfil",
severity=severity,
title="Data Exfiltration Risk Actions",
description=(
f"Statement grants {len(matched_exfil)} potential exfiltration "
f"action(s) to principal '{principal}': {', '.join(matched_exfil[:5])}."
),
affected_actions=matched_exfil,
affected_resource=resource_str,
recommendation=(
"Scope data-read actions to specific resource ARNs. "
"Add VPC endpoint conditions and restrict cross-account access. "
"Enable GuardDuty and CloudTrail for all regions."
),
mitre_technique="T1530",
))
elif check_mode == "public-exposure":
principal_str = _extract_principal(statement)
is_public = any(p in principal_str for p in ["*", "AWS:*", '"*"'])
if is_public:
sev = _bump_severity("high", severity_modifier)
if wildcard_action:
sev = _bump_severity("critical", severity_modifier)
findings.append(IAMFinding(
finding_id=f"{finding_prefix}-PUB-{len(findings) + 1:03d}",
category="public-exposure",
severity=sev,
title="Public Principal Detected",
description=(
f"Statement uses Principal '*' allowing any AWS account or "
f"unauthenticated entity to perform: {', '.join(actions[:5])}."
),
affected_actions=actions[:10],
affected_resource=resource_str,
recommendation=(
"Replace Principal '*' with specific account ARNs, "
"organisation units, or role ARNs. Use Condition keys "
"like aws:PrincipalOrgID to limit to your AWS Org."
),
mitre_technique="T1190",
))
return findings
def analyze_policy(
policy: dict,
check_mode: str,
source: str,
severity_modifier: str,
provider: str = "aws",
) -> IAMAnalysisResult:
"""
Analyse a full IAM policy document for findings.
Iterates over every Statement in the policy and delegates to
analyze_statement() for per-check logic.
Args:
policy: Parsed IAM policy JSON dict.
check_mode: privilege-escalation | data-exfil | public-exposure.
source: Display name / file path for the policy.
severity_modifier: internet-facing | regulated-data | none.
provider: aws | azure | gcp (currently only aws fully supported).
Returns:
IAMAnalysisResult with all findings populated.
"""
result = IAMAnalysisResult(
source=source,
check_mode=check_mode,
provider=provider,
severity_modifier=severity_modifier,
)
statements = policy.get("Statement") or policy.get("statement") or []
if not isinstance(statements, list):
statements = [statements]
prefix = source.replace(" ", "_").replace("/", "_")[:12].upper()
for idx, stmt in enumerate(statements):
stmt_findings = analyze_statement(
statement=stmt,
check_mode=check_mode,
finding_prefix=f"{prefix}-S{idx + 1:02d}",
severity_modifier=severity_modifier,
)
result.findings.extend(stmt_findings)
result.summary = {
"total_statements": len(statements),
"total_findings": len(result.findings),
"critical": result.critical_count,
"high": result.high_count,
"medium": result.medium_count,
"low": result.low_count,
"check_mode": check_mode,
"provider": provider,
"severity_modifier": severity_modifier,
}
return result
# ---------------------------------------------------------------------------
# S3 Posture Check (new)
# ---------------------------------------------------------------------------
def check_s3_policy(
policy: dict,
source: str,
severity_modifier: str,
) -> IAMAnalysisResult:
"""
Check S3 bucket policy or Terraform aws_s3_bucket block for misconfigurations.
Checks performed:
1. Principal "*" in bucket policy -> Critical
2. block_public_acls missing or false -> Critical
3. server_side_encryption absent or not AES256/aws:kms -> High
4. versioning disabled -> Medium
5. access logging disabled -> High
Args:
policy: Parsed S3 policy / Terraform block dict.
source: Display name / file path.
severity_modifier: internet-facing | regulated-data | none.
Returns:
IAMAnalysisResult populated with S3 findings.
"""
result = IAMAnalysisResult(
source=source,
check_mode="s3",
provider="aws",
severity_modifier=severity_modifier,
)
findings: List[IAMFinding] = []
fid = 0
def _next_id() -> str:
nonlocal fid
fid += 1
return f"S3-{fid:03d}"
# --- Check 1: Public principal in bucket policy ---
statements = policy.get("Statement") or policy.get("statement") or []
if isinstance(statements, list):
for stmt in statements:
if not _is_allow(stmt):
continue
principal = _extract_principal(stmt)
if "*" in principal or '"*"' in principal:
severity = _bump_severity("critical", severity_modifier)
findings.append(IAMFinding(
finding_id=_next_id(),
category="s3",
severity=severity,
title="S3 Bucket Policy: Public Principal",
description=(
"Bucket policy contains Principal '*' which grants public "
"access to any AWS account or unauthenticated user."
),
affected_actions=_extract_actions(stmt),
affected_resource=source,
recommendation=(
"Remove Principal '*'. Restrict to specific account ARNs or "
"use aws:PrincipalOrgID condition to limit to your AWS Org."
),
mitre_technique="T1530",
))
# --- Check 2: block_public_acls missing or false ---
# Terraform resource format: aws_s3_bucket_public_access_block
public_access_block = (
policy.get("block_public_acls")
or policy.get("BlockPublicAcls")
or policy.get("public_access_block", {}).get("block_public_acls")
)
restrict_public_buckets = (
policy.get("restrict_public_buckets")
or policy.get("RestrictPublicBuckets")
)
block_public_policy = (
policy.get("block_public_policy")
or policy.get("BlockPublicPolicy")
)
# If any of these are explicitly False or absent, flag it
block_fields = {
"block_public_acls": public_access_block,
"restrict_public_buckets": restrict_public_buckets,
"block_public_policy": block_public_policy,
}
missing_blocks = [k for k, v in block_fields.items() if v is None or v is False]
if missing_blocks:
severity = _bump_severity("critical", severity_modifier)
findings.append(IAMFinding(
finding_id=_next_id(),
category="s3",
severity=severity,
title="S3 Public Access Block Not Fully Enabled",
description=(
f"Public access block settings are missing or disabled: "
f"{', '.join(missing_blocks)}. This may allow public ACL or policy access."
),
affected_resource=source,
recommendation=(
"Enable all four S3 Block Public Access settings: "
"BlockPublicAcls, BlockPublicPolicy, IgnorePublicAcls, RestrictPublicBuckets."
),
mitre_technique="T1530",
))
# --- Check 3: Server-side encryption ---
sse_config = (
policy.get("server_side_encryption_configuration")
or policy.get("ServerSideEncryptionConfiguration")
or policy.get("encryption")
or policy.get("sse_algorithm")
)
has_sse = False
if isinstance(sse_config, dict):
rules = sse_config.get("Rule") or sse_config.get("rules") or []
if not isinstance(rules, list):
rules = [rules]
for rule in rules:
apply_sse = (
rule.get("ApplyServerSideEncryptionByDefault")
or rule.get("apply_server_side_encryption_by_default")
or {}
)
algo = str(apply_sse.get("SSEAlgorithm") or apply_sse.get("sse_algorithm") or "")
if algo.upper() in ("AES256", "AWS:KMS"):
has_sse = True
elif isinstance(sse_config, str):
has_sse = sse_config.upper() in ("AES256", "AWS:KMS")
if not has_sse:
severity = _bump_severity("high", severity_modifier)
findings.append(IAMFinding(
finding_id=_next_id(),
category="s3",
severity=severity,
title="S3 Server-Side Encryption Not Configured",
description=(
"No server-side encryption (SSE-S3 or SSE-KMS) found on this bucket. "
"Data is stored unencrypted at rest."
),
affected_resource=source,
recommendation=(
"Enable SSE via a bucket encryption configuration. "
"Use aws:kms with a CMK for regulated workloads. "
"Consider enforcing encryption via bucket policy (aws:SecureTransport)."
),
mitre_technique="T1022",
))
# --- Check 4: Versioning disabled ---
versioning = (
policy.get("versioning")
or policy.get("VersioningConfiguration")
)
versioning_enabled = False
if isinstance(versioning, dict):
status = str(
versioning.get("Status")
or versioning.get("status")
or versioning.get("enabled")
or ""
)
versioning_enabled = status.lower() in ("enabled", "true")
elif isinstance(versioning, bool):
versioning_enabled = versioning
if not versioning_enabled:
severity = _bump_severity("medium", severity_modifier)
findings.append(IAMFinding(
finding_id=_next_id(),
category="s3",
severity=severity,
title="S3 Bucket Versioning Disabled",
description=(
"Versioning is not enabled on this bucket. "
"Accidental or malicious object deletion/overwrite cannot be recovered."
),
affected_resource=source,
recommendation=(
"Enable bucket versioning. "
"Combine with Object Lock and lifecycle policies for regulated workloads."
),
mitre_technique="T1485",
))
result.findings = findings
result.summary = {
"total_findings": len(findings),
"critical": sum(1 for f in findings if f.severity == "critical"),
"high": sum(1 for f in findings if f.severity == "high"),
"medium": sum(1 for f in findings if f.severity == "medium"),
"low": sum(1 for f in findings if f.severity == "low"),
"check_mode": "s3",
"provider": "aws",
"severity_modifier": severity_modifier,
}
return result
# ---------------------------------------------------------------------------
# Security Group Check (new)
# ---------------------------------------------------------------------------
def check_security_group(
sg_json: dict,
source: str,
severity_modifier: str,
) -> IAMAnalysisResult:
"""
Check AWS Security Group JSON for dangerous inbound rules.
Args:
sg_json: Parsed Security Group JSON (AWS DescribeSecurityGroups
output format or Terraform aws_security_group block).
source: Display name / file path.
severity_modifier: internet-facing | regulated-data | none.
Returns:
IAMAnalysisResult populated with SG findings.
"""
RISKY_PORTS: Dict[int, str] = {
22: "SSH",
3389: "RDP",
23: "Telnet",
21: "FTP",
3306: "MySQL",
5432: "PostgreSQL",
1433: "MSSQL",
27017: "MongoDB",
6379: "Redis",
}
result = IAMAnalysisResult(
source=source,
check_mode="sg",
provider="aws",
severity_modifier=severity_modifier,
)
findings: List[IAMFinding] = []
fid = 0
def _next_id() -> str:
nonlocal fid
fid += 1
return f"SG-{fid:03d}"
# Support both AWS API format (IpPermissions) and Terraform ingress blocks
ip_permissions = sg_json.get("IpPermissions") or []
terraform_ingress = sg_json.get("ingress") or []
# Normalise Terraform ingress blocks to AWS API format
normalised: List[dict] = list(ip_permissions)
for ing in terraform_ingress:
if not isinstance(ing, dict):
continue
cidr_blocks = ing.get("cidr_blocks") or []
ipv6_cidr_blocks = ing.get("ipv6_cidr_blocks") or []
ip_ranges = [{"CidrIp": c} for c in cidr_blocks]
ipv6_ranges = [{"CidrIpv6": c} for c in ipv6_cidr_blocks]
normalised.append({
"IpProtocol": str(ing.get("protocol", "tcp")),
"FromPort": ing.get("from_port", 0),
"ToPort": ing.get("to_port", 65535),
"IpRanges": ip_ranges,
"Ipv6Ranges": ipv6_ranges,
})
for rule in normalised:
from_port = rule.get("FromPort", 0)
to_port = rule.get("ToPort", 65535)
protocol = str(rule.get("IpProtocol", "tcp"))
# Collect CIDRs from both IPv4 and IPv6 ranges
all_ranges: List[Tuple[str, str]] = []
for ip_range in rule.get("IpRanges", []):
cidr = ip_range.get("CidrIp", "")
if cidr:
all_ranges.append((cidr, "ipv4"))
for ip_range in rule.get("Ipv6Ranges", []):
cidr = ip_range.get("CidrIpv6", "")
if cidr:
all_ranges.append((cidr, "ipv6"))
for cidr, ip_ver in all_ranges:
if cidr not in ("0.0.0.0/0", "::/0"):
continue # Not open to the world
if protocol == "-1":
# All traffic open to the internet
severity = _bump_severity("critical", severity_modifier)
findings.append(IAMFinding(
finding_id=_next_id(),
category="sg",
severity=severity,
title="Security Group: All Traffic Open to Internet",
description=(
f"Inbound rule allows ALL traffic (protocol -1) "
f"from {cidr} ({ip_ver}). This exposes every port on every instance "
"in this security group to the public internet."
),
affected_resource=source,
recommendation=(
"Remove the all-traffic rule. Define explicit port/protocol "
"allowlist rules for only the services that must be internet-accessible."
),
mitre_technique="T1190",
))
continue
# Check port range against RISKY_PORTS
matched_ports = [
p for p in RISKY_PORTS
if from_port <= p <= to_port
]
if matched_ports:
for port in matched_ports:
service = RISKY_PORTS[port]
severity = _bump_severity("critical", severity_modifier)
findings.append(IAMFinding(
finding_id=_next_id(),
category="sg",
severity=severity,
title=f"Security Group: {service} ({port}) Open to Internet",
description=(
f"Inbound rule allows {service} (port {port}/{protocol}) "
f"from {cidr} ({ip_ver}). Direct internet access to {service} "
"exposes this service to brute-force, exploitation, and scanning."
),
affected_resource=source,
recommendation=(
f"Restrict port {port} to specific trusted CIDRs or a VPN/bastion. "
f"For {service}, consider using AWS Systems Manager Session Manager "
"as a zero-trust alternative that requires no open inbound ports."
),
mitre_technique="T1133",
))
else:
# Open to the internet on a non-standard port
severity = _bump_severity("high", severity_modifier)
port_label = (
f"port {from_port}"
if from_port == to_port
else f"ports {from_port}-{to_port}"
)
findings.append(IAMFinding(
finding_id=_next_id(),
category="sg",
severity=severity,
title=f"Security Group: {port_label.title()} Open to Internet",
description=(
f"Inbound rule opens {port_label} ({protocol}) to {cidr} ({ip_ver}). "
"Broad internet exposure increases attack surface even on non-standard ports."
),
affected_resource=source,
recommendation=(
f"Restrict {port_label} to the specific IP ranges that require access. "
"Use Security Group references instead of CIDRs where possible."
),
mitre_technique="T1046",
))
result.findings = findings
result.summary = {
"total_findings": len(findings),
"critical": sum(1 for f in findings if f.severity == "critical"),
"high": sum(1 for f in findings if f.severity == "high"),
"medium": sum(1 for f in findings if f.severity == "medium"),
"low": sum(1 for f in findings if f.severity == "low"),
"check_mode": "sg",
"provider": "aws",
"severity_modifier": severity_modifier,
}
return result
# ---------------------------------------------------------------------------
# Text Report
# ---------------------------------------------------------------------------
def print_text_report(result: IAMAnalysisResult) -> None:
"""Print a formatted text report for the analysis result."""
sep = "=" * 70
print(sep)
print(" Cloud Posture Check")
print(sep)
print(f" Source : {result.source}")
print(f" Check Mode : {result.check_mode}")
print(f" Provider : {result.provider.upper()}")
print(f" Severity Mod : {result.severity_modifier}")
print(f" Timestamp : {result.timestamp_utc}")
print(sep)
summary = result.summary
print(f"\n Summary:")
print(f" Total Findings : {summary.get('total_findings', 0)}")
if summary.get("critical", 0):
print(f" CRITICAL : {summary['critical']}")
if summary.get("high", 0):
print(f" HIGH : {summary['high']}")
if summary.get("medium", 0):
print(f" MEDIUM : {summary['medium']}")
if summary.get("low", 0):
print(f" LOW : {summary['low']}")
if not result.findings:
print("\n No findings detected.")
print(sep)
return
print(f"\n Findings ({len(result.findings)}):")
for finding in result.findings:
print(f"\n [{finding.severity.upper()}] {finding.finding_id}: {finding.title}")
print(f" {finding.description}")
if finding.affected_actions:
preview = finding.affected_actions[:4]
suffix = f" (+{len(finding.affected_actions) - 4} more)" if len(finding.affected_actions) > 4 else ""
print(f" Actions : {', '.join(preview)}{suffix}")
print(f" Resource : {finding.affected_resource}")
print(f" MITRE : {finding.mitre_technique}")
print(f" Fix : {finding.recommendation}")
print(f"\n{sep}")
# ---------------------------------------------------------------------------
# Result Serialisation
# ---------------------------------------------------------------------------
def result_to_dict(result: IAMAnalysisResult) -> dict:
"""Convert IAMAnalysisResult to a JSON-serialisable dict."""
return {
"source": result.source,
"check_mode": result.check_mode,
"provider": result.provider,
"severity_modifier": result.severity_modifier,
"timestamp_utc": result.timestamp_utc,
"summary": result.summary,
"findings": [asdict(f) for f in result.findings],
}
# ---------------------------------------------------------------------------
# Main Entry Point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Cloud Security Posture Check — IAM, S3, and Security Group analysis",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s policy.json
%(prog)s policy.json --check privilege-escalation --json
%(prog)s policy.json --check data-exfil --severity-modifier regulated-data --json
%(prog)s policy.json --check public-exposure --json
%(prog)s bucket.json --check s3 --severity-modifier internet-facing --json
%(prog)s sg.json --check sg --provider aws --json
%(prog)s policy.json --check all --json
Exit codes:
0 No findings or informational only
1 High-severity findings present
2 Critical findings present
""",
)
parser.add_argument(
"input_file",
help="Path to JSON file (IAM policy, S3 config, or Security Group JSON)",
)
parser.add_argument(
"--check",
choices=["privilege-escalation", "data-exfil", "public-exposure", "s3", "sg", "all"],
default="privilege-escalation",
help="Check mode to run (default: privilege-escalation)",
)
parser.add_argument(
"--provider",
choices=["aws", "azure", "gcp"],
default="aws",
help="Cloud provider (default: aws; Azure/GCP: only IAM checks available)",
)
parser.add_argument(
"--severity-modifier",
choices=["internet-facing", "regulated-data", "none"],
default="none",
dest="severity_modifier",
help="Bump all finding severities +1 band (default: none)",
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON",
)
parser.add_argument(
"--output", "-o",
metavar="FILE",
help="Write JSON output to file",
)
args = parser.parse_args()
# --- Load input file ---
try:
with open(args.input_file, "r", encoding="utf-8") as fh:
policy_data = json.load(fh)
except FileNotFoundError:
err = {"error": f"File not found: {args.input_file}"}
if args.json:
print(json.dumps(err, indent=2))
else:
print(f"Error: {err['error']}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as exc:
err = {"error": f"Invalid JSON: {exc}"}
if args.json:
print(json.dumps(err, indent=2))
else:
print(f"Error: {err['error']}", file=sys.stderr)
sys.exit(1)
source = args.input_file
severity_modifier = args.severity_modifier
provider = args.provider
# --- Gate S3 / SG checks by provider ---
check_mode = args.check
if check_mode in ("s3", "sg") and provider != "aws":
msg = (
f"Azure/GCP checks coming soon — "
f"use --provider aws for S3/SG analysis"
)
if args.json:
print(json.dumps({"message": msg, "provider": provider, "check_mode": check_mode}, indent=2))
else:
print(msg)
sys.exit(0)
# --- Run checks ---
all_results: List[IAMAnalysisResult] = []
iam_check_modes = ["privilege-escalation", "data-exfil", "public-exposure"]
if check_mode == "all":
if provider == "aws":
# Run all IAM checks
for mode in iam_check_modes:
r = analyze_policy(
policy=policy_data,
check_mode=mode,
source=source,
severity_modifier=severity_modifier,
provider=provider,
)
all_results.append(r)
# Run S3
s3_r = check_s3_policy(
policy=policy_data,
source=source,
severity_modifier=severity_modifier,
)
all_results.append(s3_r)
# Run SG
sg_r = check_security_group(
sg_json=policy_data,
source=source,
severity_modifier=severity_modifier,
)
all_results.append(sg_r)
else:
for mode in iam_check_modes:
r = analyze_policy(
policy=policy_data,
check_mode=mode,
source=source,
severity_modifier=severity_modifier,
provider=provider,
)
all_results.append(r)
elif check_mode in iam_check_modes:
r = analyze_policy(
policy=policy_data,
check_mode=check_mode,
source=source,
severity_modifier=severity_modifier,
provider=provider,
)
all_results.append(r)
elif check_mode == "s3":
r = check_s3_policy(
policy=policy_data,
source=source,
severity_modifier=severity_modifier,
)
all_results.append(r)
elif check_mode == "sg":
r = check_security_group(
sg_json=policy_data,
source=source,
severity_modifier=severity_modifier,
)
all_results.append(r)
# --- Flatten findings for output when multiple checks run ---
if len(all_results) == 1:
combined_result = all_results[0]
else:
# Merge into a single result
all_findings: List[IAMFinding] = []
for res in all_results:
all_findings.extend(res.findings)
combined_result = IAMAnalysisResult(
source=source,
check_mode=check_mode,
provider=provider,
severity_modifier=severity_modifier,
)
combined_result.findings = all_findings
combined_result.summary = {
"total_findings": len(all_findings),
"critical": sum(1 for f in all_findings if f.severity == "critical"),
"high": sum(1 for f in all_findings if f.severity == "high"),
"medium": sum(1 for f in all_findings if f.severity == "medium"),
"low": sum(1 for f in all_findings if f.severity == "low"),
"check_mode": check_mode,
"provider": provider,
"severity_modifier": severity_modifier,
"checks_run": [r.check_mode for r in all_results],
}
# --- Output ---
if args.json or args.output:
output_dict = result_to_dict(combined_result)
json_str = json.dumps(output_dict, indent=2)
if args.output:
with open(args.output, "w", encoding="utf-8") as fh:
fh.write(json_str)
if not args.json:
print(f"Results written to {args.output}")
if args.json:
print(json_str)
else:
print_text_report(combined_result)
# --- Exit code ---
if combined_result.critical_count > 0:
sys.exit(2)
if combined_result.high_count > 0:
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()