Adds threat-detection, incident-response, cloud-security, red-team, and ai-security skills to engineering-team. Each includes SKILL.md, references, and Python scripts (stdlib-only). Consolidation of 66 individual skills into 5 production-ready packages.
1181 lines
42 KiB
Python
1181 lines
42 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
cloud_posture_check.py — Cloud Security Posture Check
|
|
|
|
Analyses IAM policies and cloud resource configurations for privilege
|
|
escalation paths, data exfiltration risks, public exposure, S3 bucket
|
|
misconfigurations, and Security Group dangerous inbound rules.
|
|
|
|
Supports AWS (full), with Azure/GCP stubs for future expansion.
|
|
|
|
Usage:
|
|
python3 cloud_posture_check.py policy.json
|
|
python3 cloud_posture_check.py policy.json --check privilege-escalation --json
|
|
python3 cloud_posture_check.py sg.json --check sg --provider aws --json
|
|
python3 cloud_posture_check.py bucket.json --check s3 --severity-modifier internet-facing
|
|
|
|
Exit codes:
|
|
0 No findings or informational only
|
|
1 High-severity findings present
|
|
2 Critical findings present
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from dataclasses import dataclass, field, asdict
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# IAM Analysis Constants (from analyze_iam_policy.py base)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
PRIVILEGE_ESCALATION_ACTIONS: List[str] = [
|
|
"iam:CreatePolicyVersion",
|
|
"iam:SetDefaultPolicyVersion",
|
|
"iam:PassRole",
|
|
"iam:CreateAccessKey",
|
|
"iam:CreateLoginProfile",
|
|
"iam:UpdateLoginProfile",
|
|
"iam:AttachUserPolicy",
|
|
"iam:AttachGroupPolicy",
|
|
"iam:AttachRolePolicy",
|
|
"iam:PutUserPolicy",
|
|
"iam:PutGroupPolicy",
|
|
"iam:PutRolePolicy",
|
|
"iam:AddUserToGroup",
|
|
"iam:UpdateAssumeRolePolicy",
|
|
"sts:AssumeRole",
|
|
"iam:CreateRole",
|
|
"iam:DeletePolicyVersion",
|
|
"iam:CreateUser",
|
|
"iam:UpdateAccessKey",
|
|
"iam:DeactivateMFADevice",
|
|
"iam:DeleteVirtualMFADevice",
|
|
"iam:ResyncMFADevice",
|
|
"iam:EnableMFADevice",
|
|
"iam:DeleteUserPermissionsBoundary",
|
|
"iam:DeleteRolePermissionsBoundary",
|
|
"lambda:CreateFunction",
|
|
"lambda:InvokeFunction",
|
|
"lambda:UpdateFunctionCode",
|
|
"lambda:AddPermission",
|
|
"ec2:RunInstances",
|
|
"ec2:AssociateIamInstanceProfile",
|
|
"ec2:ReplaceIamInstanceProfileAssociation",
|
|
"cloudformation:CreateStack",
|
|
"cloudformation:UpdateStack",
|
|
"datapipeline:CreatePipeline",
|
|
"datapipeline:PutPipelineDefinition",
|
|
"glue:CreateDevEndpoint",
|
|
"glue:UpdateDevEndpoint",
|
|
"codestar:CreateProject",
|
|
"codecommit:CreateRepository",
|
|
"ssm:SendCommand",
|
|
"ssm:StartSession",
|
|
]
|
|
|
|
ESCALATION_COMBOS: List[Dict[str, Any]] = [
|
|
{
|
|
"name": "PassRole + Lambda Invoke",
|
|
"actions": ["iam:PassRole", "lambda:InvokeFunction"],
|
|
"description": "Attacker can pass a privileged role to a Lambda function and invoke it",
|
|
"severity": "critical",
|
|
},
|
|
{
|
|
"name": "PassRole + EC2 RunInstances",
|
|
"actions": ["iam:PassRole", "ec2:RunInstances"],
|
|
"description": "Attacker can launch an EC2 instance with a privileged IAM role",
|
|
"severity": "critical",
|
|
},
|
|
{
|
|
"name": "CreatePolicyVersion + SetDefaultPolicyVersion",
|
|
"actions": ["iam:CreatePolicyVersion", "iam:SetDefaultPolicyVersion"],
|
|
"description": "Attacker can create and activate a new policy version granting full access",
|
|
"severity": "critical",
|
|
},
|
|
{
|
|
"name": "AttachUserPolicy + AdministratorAccess",
|
|
"actions": ["iam:AttachUserPolicy"],
|
|
"description": "Can attach any managed policy including AdministratorAccess to users",
|
|
"severity": "high",
|
|
},
|
|
{
|
|
"name": "PutUserPolicy + Wildcard",
|
|
"actions": ["iam:PutUserPolicy"],
|
|
"description": "Can inject inline policies with wildcard permissions",
|
|
"severity": "high",
|
|
},
|
|
{
|
|
"name": "CloudFormation Stack Manipulation",
|
|
"actions": ["cloudformation:CreateStack", "iam:PassRole"],
|
|
"description": "Attacker can deploy a CloudFormation stack with a privileged role",
|
|
"severity": "critical",
|
|
},
|
|
{
|
|
"name": "SSM Session Start",
|
|
"actions": ["ssm:StartSession"],
|
|
"description": "Can start interactive sessions on EC2 instances without SSH",
|
|
"severity": "high",
|
|
},
|
|
{
|
|
"name": "Glue Dev Endpoint",
|
|
"actions": ["glue:CreateDevEndpoint", "iam:PassRole"],
|
|
"description": "Can create a Glue dev endpoint with a privileged role for code execution",
|
|
"severity": "critical",
|
|
},
|
|
]
|
|
|
|
DATA_EXFILTRATION_ACTIONS: List[str] = [
|
|
"s3:GetObject",
|
|
"s3:ListBucket",
|
|
"s3:GetBucketAcl",
|
|
"s3:GetObjectAcl",
|
|
"s3:GetBucketPolicy",
|
|
"s3:PutBucketPolicy",
|
|
"s3:PutBucketAcl",
|
|
"s3:PutObjectAcl",
|
|
"s3:CopyObject",
|
|
"s3:HeadObject",
|
|
"rds:DescribeDBInstances",
|
|
"rds:DownloadDBLogFilePortion",
|
|
"rds:DescribeDBSnapshots",
|
|
"rds:RestoreDBInstanceFromDBSnapshot",
|
|
"dynamodb:Scan",
|
|
"dynamodb:Query",
|
|
"dynamodb:GetItem",
|
|
"dynamodb:BatchGetItem",
|
|
"ec2:DescribeInstances",
|
|
"ec2:DescribeSnapshots",
|
|
"ec2:CreateSnapshot",
|
|
"ec2:ModifySnapshotAttribute",
|
|
"ecr:GetDownloadUrlForLayer",
|
|
"ecr:BatchGetImage",
|
|
"secretsmanager:GetSecretValue",
|
|
"secretsmanager:ListSecrets",
|
|
"ssm:GetParameter",
|
|
"ssm:GetParameters",
|
|
"ssm:GetParametersByPath",
|
|
"kms:Decrypt",
|
|
"kms:GenerateDataKey",
|
|
"lambda:GetFunction",
|
|
"codecommit:GitPull",
|
|
"cloudtrail:StopLogging",
|
|
"cloudtrail:DeleteTrail",
|
|
"guardduty:DeleteDetector",
|
|
"logs:DeleteLogGroup",
|
|
"logs:DeleteLogStream",
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data Classes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class IAMFinding:
|
|
"""Represents a single IAM or cloud posture finding."""
|
|
finding_id: str
|
|
category: str # privilege-escalation | data-exfil | public-exposure | s3 | sg
|
|
severity: str # critical | high | medium | low | informational
|
|
title: str
|
|
description: str
|
|
affected_actions: List[str] = field(default_factory=list)
|
|
affected_resource: str = "*"
|
|
recommendation: str = ""
|
|
mitre_technique: str = ""
|
|
|
|
|
|
@dataclass
|
|
class IAMAnalysisResult:
|
|
"""Aggregated result of an IAM / posture analysis run."""
|
|
source: str
|
|
check_mode: str
|
|
provider: str
|
|
severity_modifier: str
|
|
findings: List[IAMFinding] = field(default_factory=list)
|
|
summary: Dict[str, Any] = field(default_factory=dict)
|
|
timestamp_utc: str = ""
|
|
|
|
def __post_init__(self) -> None:
|
|
if not self.timestamp_utc:
|
|
self.timestamp_utc = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
@property
|
|
def critical_count(self) -> int:
|
|
return sum(1 for f in self.findings if f.severity == "critical")
|
|
|
|
@property
|
|
def high_count(self) -> int:
|
|
return sum(1 for f in self.findings if f.severity == "high")
|
|
|
|
@property
|
|
def medium_count(self) -> int:
|
|
return sum(1 for f in self.findings if f.severity == "medium")
|
|
|
|
@property
|
|
def low_count(self) -> int:
|
|
return sum(1 for f in self.findings if f.severity == "low")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Severity Bump Utility
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SEV_LADDER = ["informational", "low", "medium", "high", "critical"]
|
|
|
|
|
|
def _bump_severity(severity: str, modifier: str) -> str:
|
|
"""
|
|
Bump severity up one band when modifier is internet-facing or regulated-data.
|
|
|
|
low -> medium -> high -> critical (caps at critical).
|
|
"""
|
|
if modifier not in ("internet-facing", "regulated-data"):
|
|
return severity
|
|
try:
|
|
idx = _SEV_LADDER.index(severity.lower())
|
|
return _SEV_LADDER[min(idx + 1, len(_SEV_LADDER) - 1)]
|
|
except ValueError:
|
|
return severity
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Core IAM Analysis Functions (from analyze_iam_policy.py base)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _extract_actions(statement: dict) -> List[str]:
|
|
"""Normalise Action field to a list of lowercase strings."""
|
|
action_field = statement.get("Action") or statement.get("action") or []
|
|
if isinstance(action_field, str):
|
|
return [action_field.lower()]
|
|
return [str(a).lower() for a in action_field]
|
|
|
|
|
|
def _extract_resources(statement: dict) -> List[str]:
|
|
"""Normalise Resource field to a list of strings."""
|
|
resource_field = (
|
|
statement.get("Resource")
|
|
or statement.get("resource")
|
|
or ["*"]
|
|
)
|
|
if isinstance(resource_field, str):
|
|
return [resource_field]
|
|
return [str(r) for r in resource_field]
|
|
|
|
|
|
def _extract_principal(statement: dict) -> str:
|
|
"""Return a string representation of the Principal."""
|
|
principal = statement.get("Principal") or statement.get("principal") or "N/A"
|
|
if isinstance(principal, dict):
|
|
parts = []
|
|
for k, v in principal.items():
|
|
if isinstance(v, list):
|
|
parts.append(f"{k}:{','.join(v)}")
|
|
else:
|
|
parts.append(f"{k}:{v}")
|
|
return " | ".join(parts)
|
|
return str(principal)
|
|
|
|
|
|
def _is_allow(statement: dict) -> bool:
|
|
effect = str(statement.get("Effect") or statement.get("effect") or "Allow")
|
|
return effect.strip().lower() == "allow"
|
|
|
|
|
|
def analyze_statement(
|
|
statement: dict,
|
|
check_mode: str,
|
|
finding_prefix: str,
|
|
severity_modifier: str,
|
|
) -> List[IAMFinding]:
|
|
"""
|
|
Analyse a single IAM policy statement for risks.
|
|
|
|
Args:
|
|
statement: Parsed IAM statement dict.
|
|
check_mode: One of privilege-escalation | data-exfil | public-exposure.
|
|
finding_prefix: Short string used to prefix finding IDs.
|
|
severity_modifier: internet-facing | regulated-data | none.
|
|
|
|
Returns:
|
|
List of IAMFinding objects (may be empty).
|
|
"""
|
|
findings: List[IAMFinding] = []
|
|
|
|
if not _is_allow(statement):
|
|
return findings
|
|
|
|
actions = _extract_actions(statement)
|
|
resources = _extract_resources(statement)
|
|
principal = _extract_principal(statement)
|
|
resource_str = ", ".join(resources[:3]) + ("..." if len(resources) > 3 else "")
|
|
|
|
wildcard_resource = any(r in ("*", "arn:aws:*") for r in resources)
|
|
wildcard_action = any(a in ("*", "iam:*", "s3:*", "ec2:*") for a in actions)
|
|
|
|
if check_mode == "privilege-escalation":
|
|
# Check individual high-risk actions
|
|
matched_privesc = [
|
|
a for a in actions
|
|
if a in [p.lower() for p in PRIVILEGE_ESCALATION_ACTIONS]
|
|
]
|
|
|
|
if matched_privesc:
|
|
severity = "high" if not wildcard_resource else "critical"
|
|
severity = _bump_severity(severity, severity_modifier)
|
|
findings.append(IAMFinding(
|
|
finding_id=f"{finding_prefix}-PRIVESC-{len(findings) + 1:03d}",
|
|
category="privilege-escalation",
|
|
severity=severity,
|
|
title="Privilege Escalation Actions Detected",
|
|
description=(
|
|
f"Statement grants {len(matched_privesc)} privilege escalation "
|
|
f"action(s) to principal '{principal}' on resources: {resource_str}."
|
|
),
|
|
affected_actions=matched_privesc,
|
|
affected_resource=resource_str,
|
|
recommendation=(
|
|
"Apply least-privilege: restrict IAM mutation actions to specific "
|
|
"resource ARNs and add Condition constraints. Consider permission boundaries."
|
|
),
|
|
mitre_technique="T1098",
|
|
))
|
|
|
|
# Check dangerous combos
|
|
for combo in ESCALATION_COMBOS:
|
|
combo_actions_lower = [c.lower() for c in combo["actions"]]
|
|
if all(ca in actions for ca in combo_actions_lower):
|
|
combo_sev = _bump_severity(combo["severity"], severity_modifier)
|
|
findings.append(IAMFinding(
|
|
finding_id=f"{finding_prefix}-COMBO-{len(findings) + 1:03d}",
|
|
category="privilege-escalation",
|
|
severity=combo_sev,
|
|
title=f"Escalation Combo: {combo['name']}",
|
|
description=combo["description"],
|
|
affected_actions=combo["actions"],
|
|
affected_resource=resource_str,
|
|
recommendation=(
|
|
f"Remove or scope one of the combo actions. "
|
|
f"Separate {combo['name']} permissions across different roles."
|
|
),
|
|
mitre_technique="T1548",
|
|
))
|
|
|
|
# Wildcard action with Allow
|
|
if wildcard_action:
|
|
sev = _bump_severity("critical", severity_modifier)
|
|
findings.append(IAMFinding(
|
|
finding_id=f"{finding_prefix}-WILD-{len(findings) + 1:03d}",
|
|
category="privilege-escalation",
|
|
severity=sev,
|
|
title="Wildcard Action Grant",
|
|
description=(
|
|
f"Statement uses wildcard action(s) {[a for a in actions if '*' in a]} "
|
|
f"for principal '{principal}'. This grants unrestricted access."
|
|
),
|
|
affected_actions=[a for a in actions if "*" in a],
|
|
affected_resource=resource_str,
|
|
recommendation="Replace wildcard actions with an explicit allowlist of required actions.",
|
|
mitre_technique="T1078.004",
|
|
))
|
|
|
|
elif check_mode == "data-exfil":
|
|
matched_exfil = [
|
|
a for a in actions
|
|
if a in [d.lower() for d in DATA_EXFILTRATION_ACTIONS]
|
|
]
|
|
|
|
if matched_exfil:
|
|
severity = "medium"
|
|
if wildcard_resource:
|
|
severity = "high"
|
|
# Particularly dangerous: log deletion or trail stopping
|
|
disruptive = [
|
|
a for a in matched_exfil
|
|
if any(x in a for x in ["stoplog", "deletelog", "deletetrail", "deletedetector"])
|
|
]
|
|
if disruptive:
|
|
severity = "critical"
|
|
severity = _bump_severity(severity, severity_modifier)
|
|
|
|
findings.append(IAMFinding(
|
|
finding_id=f"{finding_prefix}-EXFIL-{len(findings) + 1:03d}",
|
|
category="data-exfil",
|
|
severity=severity,
|
|
title="Data Exfiltration Risk Actions",
|
|
description=(
|
|
f"Statement grants {len(matched_exfil)} potential exfiltration "
|
|
f"action(s) to principal '{principal}': {', '.join(matched_exfil[:5])}."
|
|
),
|
|
affected_actions=matched_exfil,
|
|
affected_resource=resource_str,
|
|
recommendation=(
|
|
"Scope data-read actions to specific resource ARNs. "
|
|
"Add VPC endpoint conditions and restrict cross-account access. "
|
|
"Enable GuardDuty and CloudTrail for all regions."
|
|
),
|
|
mitre_technique="T1530",
|
|
))
|
|
|
|
elif check_mode == "public-exposure":
|
|
principal_str = _extract_principal(statement)
|
|
is_public = any(p in principal_str for p in ["*", "AWS:*", '"*"'])
|
|
|
|
if is_public:
|
|
sev = _bump_severity("high", severity_modifier)
|
|
if wildcard_action:
|
|
sev = _bump_severity("critical", severity_modifier)
|
|
|
|
findings.append(IAMFinding(
|
|
finding_id=f"{finding_prefix}-PUB-{len(findings) + 1:03d}",
|
|
category="public-exposure",
|
|
severity=sev,
|
|
title="Public Principal Detected",
|
|
description=(
|
|
f"Statement uses Principal '*' allowing any AWS account or "
|
|
f"unauthenticated entity to perform: {', '.join(actions[:5])}."
|
|
),
|
|
affected_actions=actions[:10],
|
|
affected_resource=resource_str,
|
|
recommendation=(
|
|
"Replace Principal '*' with specific account ARNs, "
|
|
"organisation units, or role ARNs. Use Condition keys "
|
|
"like aws:PrincipalOrgID to limit to your AWS Org."
|
|
),
|
|
mitre_technique="T1190",
|
|
))
|
|
|
|
return findings
|
|
|
|
|
|
def analyze_policy(
|
|
policy: dict,
|
|
check_mode: str,
|
|
source: str,
|
|
severity_modifier: str,
|
|
provider: str = "aws",
|
|
) -> IAMAnalysisResult:
|
|
"""
|
|
Analyse a full IAM policy document for findings.
|
|
|
|
Iterates over every Statement in the policy and delegates to
|
|
analyze_statement() for per-check logic.
|
|
|
|
Args:
|
|
policy: Parsed IAM policy JSON dict.
|
|
check_mode: privilege-escalation | data-exfil | public-exposure.
|
|
source: Display name / file path for the policy.
|
|
severity_modifier: internet-facing | regulated-data | none.
|
|
provider: aws | azure | gcp (currently only aws fully supported).
|
|
|
|
Returns:
|
|
IAMAnalysisResult with all findings populated.
|
|
"""
|
|
result = IAMAnalysisResult(
|
|
source=source,
|
|
check_mode=check_mode,
|
|
provider=provider,
|
|
severity_modifier=severity_modifier,
|
|
)
|
|
|
|
statements = policy.get("Statement") or policy.get("statement") or []
|
|
if not isinstance(statements, list):
|
|
statements = [statements]
|
|
|
|
prefix = source.replace(" ", "_").replace("/", "_")[:12].upper()
|
|
|
|
for idx, stmt in enumerate(statements):
|
|
stmt_findings = analyze_statement(
|
|
statement=stmt,
|
|
check_mode=check_mode,
|
|
finding_prefix=f"{prefix}-S{idx + 1:02d}",
|
|
severity_modifier=severity_modifier,
|
|
)
|
|
result.findings.extend(stmt_findings)
|
|
|
|
result.summary = {
|
|
"total_statements": len(statements),
|
|
"total_findings": len(result.findings),
|
|
"critical": result.critical_count,
|
|
"high": result.high_count,
|
|
"medium": result.medium_count,
|
|
"low": result.low_count,
|
|
"check_mode": check_mode,
|
|
"provider": provider,
|
|
"severity_modifier": severity_modifier,
|
|
}
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# S3 Posture Check (new)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def check_s3_policy(
|
|
policy: dict,
|
|
source: str,
|
|
severity_modifier: str,
|
|
) -> IAMAnalysisResult:
|
|
"""
|
|
Check S3 bucket policy or Terraform aws_s3_bucket block for misconfigurations.
|
|
|
|
Checks performed:
|
|
1. Principal "*" in bucket policy -> Critical
|
|
2. block_public_acls missing or false -> Critical
|
|
3. server_side_encryption absent or not AES256/aws:kms -> High
|
|
4. versioning disabled -> Medium
|
|
5. access logging disabled -> High
|
|
|
|
Args:
|
|
policy: Parsed S3 policy / Terraform block dict.
|
|
source: Display name / file path.
|
|
severity_modifier: internet-facing | regulated-data | none.
|
|
|
|
Returns:
|
|
IAMAnalysisResult populated with S3 findings.
|
|
"""
|
|
result = IAMAnalysisResult(
|
|
source=source,
|
|
check_mode="s3",
|
|
provider="aws",
|
|
severity_modifier=severity_modifier,
|
|
)
|
|
findings: List[IAMFinding] = []
|
|
fid = 0
|
|
|
|
def _next_id() -> str:
|
|
nonlocal fid
|
|
fid += 1
|
|
return f"S3-{fid:03d}"
|
|
|
|
# --- Check 1: Public principal in bucket policy ---
|
|
statements = policy.get("Statement") or policy.get("statement") or []
|
|
if isinstance(statements, list):
|
|
for stmt in statements:
|
|
if not _is_allow(stmt):
|
|
continue
|
|
principal = _extract_principal(stmt)
|
|
if "*" in principal or '"*"' in principal:
|
|
severity = _bump_severity("critical", severity_modifier)
|
|
findings.append(IAMFinding(
|
|
finding_id=_next_id(),
|
|
category="s3",
|
|
severity=severity,
|
|
title="S3 Bucket Policy: Public Principal",
|
|
description=(
|
|
"Bucket policy contains Principal '*' which grants public "
|
|
"access to any AWS account or unauthenticated user."
|
|
),
|
|
affected_actions=_extract_actions(stmt),
|
|
affected_resource=source,
|
|
recommendation=(
|
|
"Remove Principal '*'. Restrict to specific account ARNs or "
|
|
"use aws:PrincipalOrgID condition to limit to your AWS Org."
|
|
),
|
|
mitre_technique="T1530",
|
|
))
|
|
|
|
# --- Check 2: block_public_acls missing or false ---
|
|
# Terraform resource format: aws_s3_bucket_public_access_block
|
|
public_access_block = (
|
|
policy.get("block_public_acls")
|
|
or policy.get("BlockPublicAcls")
|
|
or policy.get("public_access_block", {}).get("block_public_acls")
|
|
)
|
|
restrict_public_buckets = (
|
|
policy.get("restrict_public_buckets")
|
|
or policy.get("RestrictPublicBuckets")
|
|
)
|
|
block_public_policy = (
|
|
policy.get("block_public_policy")
|
|
or policy.get("BlockPublicPolicy")
|
|
)
|
|
|
|
# If any of these are explicitly False or absent, flag it
|
|
block_fields = {
|
|
"block_public_acls": public_access_block,
|
|
"restrict_public_buckets": restrict_public_buckets,
|
|
"block_public_policy": block_public_policy,
|
|
}
|
|
missing_blocks = [k for k, v in block_fields.items() if v is None or v is False]
|
|
|
|
if missing_blocks:
|
|
severity = _bump_severity("critical", severity_modifier)
|
|
findings.append(IAMFinding(
|
|
finding_id=_next_id(),
|
|
category="s3",
|
|
severity=severity,
|
|
title="S3 Public Access Block Not Fully Enabled",
|
|
description=(
|
|
f"Public access block settings are missing or disabled: "
|
|
f"{', '.join(missing_blocks)}. This may allow public ACL or policy access."
|
|
),
|
|
affected_resource=source,
|
|
recommendation=(
|
|
"Enable all four S3 Block Public Access settings: "
|
|
"BlockPublicAcls, BlockPublicPolicy, IgnorePublicAcls, RestrictPublicBuckets."
|
|
),
|
|
mitre_technique="T1530",
|
|
))
|
|
|
|
# --- Check 3: Server-side encryption ---
|
|
sse_config = (
|
|
policy.get("server_side_encryption_configuration")
|
|
or policy.get("ServerSideEncryptionConfiguration")
|
|
or policy.get("encryption")
|
|
or policy.get("sse_algorithm")
|
|
)
|
|
|
|
has_sse = False
|
|
if isinstance(sse_config, dict):
|
|
rules = sse_config.get("Rule") or sse_config.get("rules") or []
|
|
if not isinstance(rules, list):
|
|
rules = [rules]
|
|
for rule in rules:
|
|
apply_sse = (
|
|
rule.get("ApplyServerSideEncryptionByDefault")
|
|
or rule.get("apply_server_side_encryption_by_default")
|
|
or {}
|
|
)
|
|
algo = str(apply_sse.get("SSEAlgorithm") or apply_sse.get("sse_algorithm") or "")
|
|
if algo.upper() in ("AES256", "AWS:KMS"):
|
|
has_sse = True
|
|
elif isinstance(sse_config, str):
|
|
has_sse = sse_config.upper() in ("AES256", "AWS:KMS")
|
|
|
|
if not has_sse:
|
|
severity = _bump_severity("high", severity_modifier)
|
|
findings.append(IAMFinding(
|
|
finding_id=_next_id(),
|
|
category="s3",
|
|
severity=severity,
|
|
title="S3 Server-Side Encryption Not Configured",
|
|
description=(
|
|
"No server-side encryption (SSE-S3 or SSE-KMS) found on this bucket. "
|
|
"Data is stored unencrypted at rest."
|
|
),
|
|
affected_resource=source,
|
|
recommendation=(
|
|
"Enable SSE via a bucket encryption configuration. "
|
|
"Use aws:kms with a CMK for regulated workloads. "
|
|
"Consider enforcing encryption via bucket policy (aws:SecureTransport)."
|
|
),
|
|
mitre_technique="T1022",
|
|
))
|
|
|
|
# --- Check 4: Versioning disabled ---
|
|
versioning = (
|
|
policy.get("versioning")
|
|
or policy.get("VersioningConfiguration")
|
|
)
|
|
versioning_enabled = False
|
|
if isinstance(versioning, dict):
|
|
status = str(
|
|
versioning.get("Status")
|
|
or versioning.get("status")
|
|
or versioning.get("enabled")
|
|
or ""
|
|
)
|
|
versioning_enabled = status.lower() in ("enabled", "true")
|
|
elif isinstance(versioning, bool):
|
|
versioning_enabled = versioning
|
|
|
|
if not versioning_enabled:
|
|
severity = _bump_severity("medium", severity_modifier)
|
|
findings.append(IAMFinding(
|
|
finding_id=_next_id(),
|
|
category="s3",
|
|
severity=severity,
|
|
title="S3 Bucket Versioning Disabled",
|
|
description=(
|
|
"Versioning is not enabled on this bucket. "
|
|
"Accidental or malicious object deletion/overwrite cannot be recovered."
|
|
),
|
|
affected_resource=source,
|
|
recommendation=(
|
|
"Enable bucket versioning. "
|
|
"Combine with Object Lock and lifecycle policies for regulated workloads."
|
|
),
|
|
mitre_technique="T1485",
|
|
))
|
|
|
|
result.findings = findings
|
|
result.summary = {
|
|
"total_findings": len(findings),
|
|
"critical": sum(1 for f in findings if f.severity == "critical"),
|
|
"high": sum(1 for f in findings if f.severity == "high"),
|
|
"medium": sum(1 for f in findings if f.severity == "medium"),
|
|
"low": sum(1 for f in findings if f.severity == "low"),
|
|
"check_mode": "s3",
|
|
"provider": "aws",
|
|
"severity_modifier": severity_modifier,
|
|
}
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Security Group Check (new)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def check_security_group(
|
|
sg_json: dict,
|
|
source: str,
|
|
severity_modifier: str,
|
|
) -> IAMAnalysisResult:
|
|
"""
|
|
Check AWS Security Group JSON for dangerous inbound rules.
|
|
|
|
Args:
|
|
sg_json: Parsed Security Group JSON (AWS DescribeSecurityGroups
|
|
output format or Terraform aws_security_group block).
|
|
source: Display name / file path.
|
|
severity_modifier: internet-facing | regulated-data | none.
|
|
|
|
Returns:
|
|
IAMAnalysisResult populated with SG findings.
|
|
"""
|
|
RISKY_PORTS: Dict[int, str] = {
|
|
22: "SSH",
|
|
3389: "RDP",
|
|
23: "Telnet",
|
|
21: "FTP",
|
|
3306: "MySQL",
|
|
5432: "PostgreSQL",
|
|
1433: "MSSQL",
|
|
27017: "MongoDB",
|
|
6379: "Redis",
|
|
}
|
|
|
|
result = IAMAnalysisResult(
|
|
source=source,
|
|
check_mode="sg",
|
|
provider="aws",
|
|
severity_modifier=severity_modifier,
|
|
)
|
|
findings: List[IAMFinding] = []
|
|
fid = 0
|
|
|
|
def _next_id() -> str:
|
|
nonlocal fid
|
|
fid += 1
|
|
return f"SG-{fid:03d}"
|
|
|
|
# Support both AWS API format (IpPermissions) and Terraform ingress blocks
|
|
ip_permissions = sg_json.get("IpPermissions") or []
|
|
terraform_ingress = sg_json.get("ingress") or []
|
|
|
|
# Normalise Terraform ingress blocks to AWS API format
|
|
normalised: List[dict] = list(ip_permissions)
|
|
for ing in terraform_ingress:
|
|
if not isinstance(ing, dict):
|
|
continue
|
|
cidr_blocks = ing.get("cidr_blocks") or []
|
|
ipv6_cidr_blocks = ing.get("ipv6_cidr_blocks") or []
|
|
ip_ranges = [{"CidrIp": c} for c in cidr_blocks]
|
|
ipv6_ranges = [{"CidrIpv6": c} for c in ipv6_cidr_blocks]
|
|
normalised.append({
|
|
"IpProtocol": str(ing.get("protocol", "tcp")),
|
|
"FromPort": ing.get("from_port", 0),
|
|
"ToPort": ing.get("to_port", 65535),
|
|
"IpRanges": ip_ranges,
|
|
"Ipv6Ranges": ipv6_ranges,
|
|
})
|
|
|
|
for rule in normalised:
|
|
from_port = rule.get("FromPort", 0)
|
|
to_port = rule.get("ToPort", 65535)
|
|
protocol = str(rule.get("IpProtocol", "tcp"))
|
|
|
|
# Collect CIDRs from both IPv4 and IPv6 ranges
|
|
all_ranges: List[Tuple[str, str]] = []
|
|
for ip_range in rule.get("IpRanges", []):
|
|
cidr = ip_range.get("CidrIp", "")
|
|
if cidr:
|
|
all_ranges.append((cidr, "ipv4"))
|
|
for ip_range in rule.get("Ipv6Ranges", []):
|
|
cidr = ip_range.get("CidrIpv6", "")
|
|
if cidr:
|
|
all_ranges.append((cidr, "ipv6"))
|
|
|
|
for cidr, ip_ver in all_ranges:
|
|
if cidr not in ("0.0.0.0/0", "::/0"):
|
|
continue # Not open to the world
|
|
|
|
if protocol == "-1":
|
|
# All traffic open to the internet
|
|
severity = _bump_severity("critical", severity_modifier)
|
|
findings.append(IAMFinding(
|
|
finding_id=_next_id(),
|
|
category="sg",
|
|
severity=severity,
|
|
title="Security Group: All Traffic Open to Internet",
|
|
description=(
|
|
f"Inbound rule allows ALL traffic (protocol -1) "
|
|
f"from {cidr} ({ip_ver}). This exposes every port on every instance "
|
|
"in this security group to the public internet."
|
|
),
|
|
affected_resource=source,
|
|
recommendation=(
|
|
"Remove the all-traffic rule. Define explicit port/protocol "
|
|
"allowlist rules for only the services that must be internet-accessible."
|
|
),
|
|
mitre_technique="T1190",
|
|
))
|
|
continue
|
|
|
|
# Check port range against RISKY_PORTS
|
|
matched_ports = [
|
|
p for p in RISKY_PORTS
|
|
if from_port <= p <= to_port
|
|
]
|
|
|
|
if matched_ports:
|
|
for port in matched_ports:
|
|
service = RISKY_PORTS[port]
|
|
severity = _bump_severity("critical", severity_modifier)
|
|
findings.append(IAMFinding(
|
|
finding_id=_next_id(),
|
|
category="sg",
|
|
severity=severity,
|
|
title=f"Security Group: {service} ({port}) Open to Internet",
|
|
description=(
|
|
f"Inbound rule allows {service} (port {port}/{protocol}) "
|
|
f"from {cidr} ({ip_ver}). Direct internet access to {service} "
|
|
"exposes this service to brute-force, exploitation, and scanning."
|
|
),
|
|
affected_resource=source,
|
|
recommendation=(
|
|
f"Restrict port {port} to specific trusted CIDRs or a VPN/bastion. "
|
|
f"For {service}, consider using AWS Systems Manager Session Manager "
|
|
"as a zero-trust alternative that requires no open inbound ports."
|
|
),
|
|
mitre_technique="T1133",
|
|
))
|
|
else:
|
|
# Open to the internet on a non-standard port
|
|
severity = _bump_severity("high", severity_modifier)
|
|
port_label = (
|
|
f"port {from_port}"
|
|
if from_port == to_port
|
|
else f"ports {from_port}-{to_port}"
|
|
)
|
|
findings.append(IAMFinding(
|
|
finding_id=_next_id(),
|
|
category="sg",
|
|
severity=severity,
|
|
title=f"Security Group: {port_label.title()} Open to Internet",
|
|
description=(
|
|
f"Inbound rule opens {port_label} ({protocol}) to {cidr} ({ip_ver}). "
|
|
"Broad internet exposure increases attack surface even on non-standard ports."
|
|
),
|
|
affected_resource=source,
|
|
recommendation=(
|
|
f"Restrict {port_label} to the specific IP ranges that require access. "
|
|
"Use Security Group references instead of CIDRs where possible."
|
|
),
|
|
mitre_technique="T1046",
|
|
))
|
|
|
|
result.findings = findings
|
|
result.summary = {
|
|
"total_findings": len(findings),
|
|
"critical": sum(1 for f in findings if f.severity == "critical"),
|
|
"high": sum(1 for f in findings if f.severity == "high"),
|
|
"medium": sum(1 for f in findings if f.severity == "medium"),
|
|
"low": sum(1 for f in findings if f.severity == "low"),
|
|
"check_mode": "sg",
|
|
"provider": "aws",
|
|
"severity_modifier": severity_modifier,
|
|
}
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Text Report
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def print_text_report(result: IAMAnalysisResult) -> None:
|
|
"""Print a formatted text report for the analysis result."""
|
|
sep = "=" * 70
|
|
print(sep)
|
|
print(" Cloud Posture Check")
|
|
print(sep)
|
|
print(f" Source : {result.source}")
|
|
print(f" Check Mode : {result.check_mode}")
|
|
print(f" Provider : {result.provider.upper()}")
|
|
print(f" Severity Mod : {result.severity_modifier}")
|
|
print(f" Timestamp : {result.timestamp_utc}")
|
|
print(sep)
|
|
|
|
summary = result.summary
|
|
print(f"\n Summary:")
|
|
print(f" Total Findings : {summary.get('total_findings', 0)}")
|
|
if summary.get("critical", 0):
|
|
print(f" CRITICAL : {summary['critical']}")
|
|
if summary.get("high", 0):
|
|
print(f" HIGH : {summary['high']}")
|
|
if summary.get("medium", 0):
|
|
print(f" MEDIUM : {summary['medium']}")
|
|
if summary.get("low", 0):
|
|
print(f" LOW : {summary['low']}")
|
|
|
|
if not result.findings:
|
|
print("\n No findings detected.")
|
|
print(sep)
|
|
return
|
|
|
|
print(f"\n Findings ({len(result.findings)}):")
|
|
for finding in result.findings:
|
|
print(f"\n [{finding.severity.upper()}] {finding.finding_id}: {finding.title}")
|
|
print(f" {finding.description}")
|
|
if finding.affected_actions:
|
|
preview = finding.affected_actions[:4]
|
|
suffix = f" (+{len(finding.affected_actions) - 4} more)" if len(finding.affected_actions) > 4 else ""
|
|
print(f" Actions : {', '.join(preview)}{suffix}")
|
|
print(f" Resource : {finding.affected_resource}")
|
|
print(f" MITRE : {finding.mitre_technique}")
|
|
print(f" Fix : {finding.recommendation}")
|
|
|
|
print(f"\n{sep}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Result Serialisation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def result_to_dict(result: IAMAnalysisResult) -> dict:
|
|
"""Convert IAMAnalysisResult to a JSON-serialisable dict."""
|
|
return {
|
|
"source": result.source,
|
|
"check_mode": result.check_mode,
|
|
"provider": result.provider,
|
|
"severity_modifier": result.severity_modifier,
|
|
"timestamp_utc": result.timestamp_utc,
|
|
"summary": result.summary,
|
|
"findings": [asdict(f) for f in result.findings],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main Entry Point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Cloud Security Posture Check — IAM, S3, and Security Group analysis",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
%(prog)s policy.json
|
|
%(prog)s policy.json --check privilege-escalation --json
|
|
%(prog)s policy.json --check data-exfil --severity-modifier regulated-data --json
|
|
%(prog)s policy.json --check public-exposure --json
|
|
%(prog)s bucket.json --check s3 --severity-modifier internet-facing --json
|
|
%(prog)s sg.json --check sg --provider aws --json
|
|
%(prog)s policy.json --check all --json
|
|
|
|
Exit codes:
|
|
0 No findings or informational only
|
|
1 High-severity findings present
|
|
2 Critical findings present
|
|
""",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"input_file",
|
|
help="Path to JSON file (IAM policy, S3 config, or Security Group JSON)",
|
|
)
|
|
parser.add_argument(
|
|
"--check",
|
|
choices=["privilege-escalation", "data-exfil", "public-exposure", "s3", "sg", "all"],
|
|
default="privilege-escalation",
|
|
help="Check mode to run (default: privilege-escalation)",
|
|
)
|
|
parser.add_argument(
|
|
"--provider",
|
|
choices=["aws", "azure", "gcp"],
|
|
default="aws",
|
|
help="Cloud provider (default: aws; Azure/GCP: only IAM checks available)",
|
|
)
|
|
parser.add_argument(
|
|
"--severity-modifier",
|
|
choices=["internet-facing", "regulated-data", "none"],
|
|
default="none",
|
|
dest="severity_modifier",
|
|
help="Bump all finding severities +1 band (default: none)",
|
|
)
|
|
parser.add_argument(
|
|
"--json",
|
|
action="store_true",
|
|
help="Output results as JSON",
|
|
)
|
|
parser.add_argument(
|
|
"--output", "-o",
|
|
metavar="FILE",
|
|
help="Write JSON output to file",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# --- Load input file ---
|
|
try:
|
|
with open(args.input_file, "r", encoding="utf-8") as fh:
|
|
policy_data = json.load(fh)
|
|
except FileNotFoundError:
|
|
err = {"error": f"File not found: {args.input_file}"}
|
|
if args.json:
|
|
print(json.dumps(err, indent=2))
|
|
else:
|
|
print(f"Error: {err['error']}", file=sys.stderr)
|
|
sys.exit(1)
|
|
except json.JSONDecodeError as exc:
|
|
err = {"error": f"Invalid JSON: {exc}"}
|
|
if args.json:
|
|
print(json.dumps(err, indent=2))
|
|
else:
|
|
print(f"Error: {err['error']}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
source = args.input_file
|
|
severity_modifier = args.severity_modifier
|
|
provider = args.provider
|
|
|
|
# --- Gate S3 / SG checks by provider ---
|
|
check_mode = args.check
|
|
|
|
if check_mode in ("s3", "sg") and provider != "aws":
|
|
msg = (
|
|
f"Azure/GCP checks coming soon — "
|
|
f"use --provider aws for S3/SG analysis"
|
|
)
|
|
if args.json:
|
|
print(json.dumps({"message": msg, "provider": provider, "check_mode": check_mode}, indent=2))
|
|
else:
|
|
print(msg)
|
|
sys.exit(0)
|
|
|
|
# --- Run checks ---
|
|
all_results: List[IAMAnalysisResult] = []
|
|
|
|
iam_check_modes = ["privilege-escalation", "data-exfil", "public-exposure"]
|
|
|
|
if check_mode == "all":
|
|
if provider == "aws":
|
|
# Run all IAM checks
|
|
for mode in iam_check_modes:
|
|
r = analyze_policy(
|
|
policy=policy_data,
|
|
check_mode=mode,
|
|
source=source,
|
|
severity_modifier=severity_modifier,
|
|
provider=provider,
|
|
)
|
|
all_results.append(r)
|
|
# Run S3
|
|
s3_r = check_s3_policy(
|
|
policy=policy_data,
|
|
source=source,
|
|
severity_modifier=severity_modifier,
|
|
)
|
|
all_results.append(s3_r)
|
|
# Run SG
|
|
sg_r = check_security_group(
|
|
sg_json=policy_data,
|
|
source=source,
|
|
severity_modifier=severity_modifier,
|
|
)
|
|
all_results.append(sg_r)
|
|
else:
|
|
for mode in iam_check_modes:
|
|
r = analyze_policy(
|
|
policy=policy_data,
|
|
check_mode=mode,
|
|
source=source,
|
|
severity_modifier=severity_modifier,
|
|
provider=provider,
|
|
)
|
|
all_results.append(r)
|
|
|
|
elif check_mode in iam_check_modes:
|
|
r = analyze_policy(
|
|
policy=policy_data,
|
|
check_mode=check_mode,
|
|
source=source,
|
|
severity_modifier=severity_modifier,
|
|
provider=provider,
|
|
)
|
|
all_results.append(r)
|
|
|
|
elif check_mode == "s3":
|
|
r = check_s3_policy(
|
|
policy=policy_data,
|
|
source=source,
|
|
severity_modifier=severity_modifier,
|
|
)
|
|
all_results.append(r)
|
|
|
|
elif check_mode == "sg":
|
|
r = check_security_group(
|
|
sg_json=policy_data,
|
|
source=source,
|
|
severity_modifier=severity_modifier,
|
|
)
|
|
all_results.append(r)
|
|
|
|
# --- Flatten findings for output when multiple checks run ---
|
|
if len(all_results) == 1:
|
|
combined_result = all_results[0]
|
|
else:
|
|
# Merge into a single result
|
|
all_findings: List[IAMFinding] = []
|
|
for res in all_results:
|
|
all_findings.extend(res.findings)
|
|
|
|
combined_result = IAMAnalysisResult(
|
|
source=source,
|
|
check_mode=check_mode,
|
|
provider=provider,
|
|
severity_modifier=severity_modifier,
|
|
)
|
|
combined_result.findings = all_findings
|
|
combined_result.summary = {
|
|
"total_findings": len(all_findings),
|
|
"critical": sum(1 for f in all_findings if f.severity == "critical"),
|
|
"high": sum(1 for f in all_findings if f.severity == "high"),
|
|
"medium": sum(1 for f in all_findings if f.severity == "medium"),
|
|
"low": sum(1 for f in all_findings if f.severity == "low"),
|
|
"check_mode": check_mode,
|
|
"provider": provider,
|
|
"severity_modifier": severity_modifier,
|
|
"checks_run": [r.check_mode for r in all_results],
|
|
}
|
|
|
|
# --- Output ---
|
|
if args.json or args.output:
|
|
output_dict = result_to_dict(combined_result)
|
|
json_str = json.dumps(output_dict, indent=2)
|
|
if args.output:
|
|
with open(args.output, "w", encoding="utf-8") as fh:
|
|
fh.write(json_str)
|
|
if not args.json:
|
|
print(f"Results written to {args.output}")
|
|
if args.json:
|
|
print(json_str)
|
|
else:
|
|
print_text_report(combined_result)
|
|
|
|
# --- Exit code ---
|
|
if combined_result.critical_count > 0:
|
|
sys.exit(2)
|
|
if combined_result.high_count > 0:
|
|
sys.exit(1)
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|