Files
Jaskarn Singh d2da9d3dad feat(engineering-team): add 5 consolidated security skills
Adds threat-detection, incident-response, cloud-security, red-team, and ai-security skills to engineering-team. Each includes SKILL.md, references, and Python scripts (stdlib-only). Consolidation of 66 individual skills into 5 production-ready packages.
2026-03-30 21:07:43 +02:00

421 lines
20 KiB
Python

#!/usr/bin/env python3
"""
engagement_planner.py — Red Team Engagement Planner
Builds a structured red team engagement plan from target scope, MITRE ATT&CK
technique selection, access level, and crown jewel assets. Scores techniques
by detection risk and effort, assembles kill-chain phases, identifies choke
points, and generates OPSEC risk items.
IMPORTANT: Authorization is required. Use --authorized flag only after obtaining
signed Rules of Engagement (RoE) and written executive authorization.
Usage:
python3 engagement_planner.py --techniques T1059,T1078,T1003 --access-level external --authorized --json
python3 engagement_planner.py --techniques T1059,T1078 --crown-jewels "DB,AD" --access-level credentialed --authorized --json
python3 engagement_planner.py --list-techniques
Exit codes:
0 Engagement plan generated successfully
1 Missing authorization or invalid input
2 Scope violation or technique outside access-level constraints
"""
import argparse
import json
import sys
MITRE_TECHNIQUES = {
"T1059": {"name": "Command and Scripting Interpreter", "tactic": "execution",
"detection_risk": 0.7, "prerequisites": ["initial_access"], "access_level": "internal"},
"T1059.001": {"name": "PowerShell", "tactic": "execution",
"detection_risk": 0.8, "prerequisites": ["initial_access"], "access_level": "internal"},
"T1078": {"name": "Valid Accounts", "tactic": "initial_access",
"detection_risk": 0.3, "prerequisites": [], "access_level": "external"},
"T1078.004": {"name": "Valid Accounts: Cloud Accounts", "tactic": "initial_access",
"detection_risk": 0.3, "prerequisites": [], "access_level": "external"},
"T1003": {"name": "OS Credential Dumping", "tactic": "credential_access",
"detection_risk": 0.9, "prerequisites": ["initial_access", "privilege_escalation"], "access_level": "internal"},
"T1003.001": {"name": "LSASS Memory", "tactic": "credential_access",
"detection_risk": 0.95, "prerequisites": ["initial_access", "privilege_escalation"], "access_level": "credentialed"},
"T1021": {"name": "Remote Services", "tactic": "lateral_movement",
"detection_risk": 0.6, "prerequisites": ["initial_access", "credential_access"], "access_level": "internal"},
"T1021.002": {"name": "SMB/Windows Admin Shares", "tactic": "lateral_movement",
"detection_risk": 0.7, "prerequisites": ["initial_access", "credential_access"], "access_level": "internal"},
"T1055": {"name": "Process Injection", "tactic": "defense_evasion",
"detection_risk": 0.85, "prerequisites": ["initial_access"], "access_level": "internal"},
"T1190": {"name": "Exploit Public-Facing Application", "tactic": "initial_access",
"detection_risk": 0.5, "prerequisites": [], "access_level": "external"},
"T1566": {"name": "Phishing", "tactic": "initial_access",
"detection_risk": 0.4, "prerequisites": [], "access_level": "external"},
"T1566.001": {"name": "Spearphishing Attachment", "tactic": "initial_access",
"detection_risk": 0.5, "prerequisites": [], "access_level": "external"},
"T1098": {"name": "Account Manipulation", "tactic": "persistence",
"detection_risk": 0.6, "prerequisites": ["initial_access", "privilege_escalation"], "access_level": "credentialed"},
"T1136": {"name": "Create Account", "tactic": "persistence",
"detection_risk": 0.7, "prerequisites": ["initial_access"], "access_level": "internal"},
"T1053": {"name": "Scheduled Task/Job", "tactic": "persistence",
"detection_risk": 0.6, "prerequisites": ["initial_access"], "access_level": "internal"},
"T1486": {"name": "Data Encrypted for Impact", "tactic": "impact",
"detection_risk": 0.99, "prerequisites": ["initial_access", "lateral_movement"], "access_level": "credentialed"},
"T1530": {"name": "Data from Cloud Storage", "tactic": "collection",
"detection_risk": 0.4, "prerequisites": ["initial_access"], "access_level": "internal"},
"T1041": {"name": "Exfiltration Over C2 Channel", "tactic": "exfiltration",
"detection_risk": 0.65, "prerequisites": ["initial_access", "collection"], "access_level": "internal"},
"T1048": {"name": "Exfiltration Over Alternative Protocol", "tactic": "exfiltration",
"detection_risk": 0.5, "prerequisites": ["initial_access", "collection"], "access_level": "internal"},
"T1083": {"name": "File and Directory Discovery", "tactic": "discovery",
"detection_risk": 0.3, "prerequisites": ["initial_access"], "access_level": "internal"},
"T1082": {"name": "System Information Discovery", "tactic": "discovery",
"detection_risk": 0.2, "prerequisites": ["initial_access"], "access_level": "internal"},
"T1057": {"name": "Process Discovery", "tactic": "discovery",
"detection_risk": 0.25, "prerequisites": ["initial_access"], "access_level": "internal"},
"T1068": {"name": "Exploitation for Privilege Escalation", "tactic": "privilege_escalation",
"detection_risk": 0.8, "prerequisites": ["initial_access"], "access_level": "internal"},
"T1484": {"name": "Domain Policy Modification", "tactic": "privilege_escalation",
"detection_risk": 0.85, "prerequisites": ["initial_access", "privilege_escalation"], "access_level": "credentialed"},
"T1562": {"name": "Impair Defenses", "tactic": "defense_evasion",
"detection_risk": 0.9, "prerequisites": ["initial_access", "privilege_escalation"], "access_level": "credentialed"},
"T1070": {"name": "Indicator Removal", "tactic": "defense_evasion",
"detection_risk": 0.75, "prerequisites": ["initial_access"], "access_level": "internal"},
"T1195": {"name": "Supply Chain Compromise", "tactic": "initial_access",
"detection_risk": 0.2, "prerequisites": [], "access_level": "external"},
"T1218": {"name": "System Binary Proxy Execution", "tactic": "defense_evasion",
"detection_risk": 0.6, "prerequisites": ["initial_access"], "access_level": "internal"},
"T1105": {"name": "Ingress Tool Transfer", "tactic": "command_and_control",
"detection_risk": 0.55, "prerequisites": ["initial_access"], "access_level": "internal"},
}
ACCESS_LEVEL_HIERARCHY = {"external": 0, "internal": 1, "credentialed": 2}
OPSEC_RISKS = [
{"risk": "C2 beacon interval too frequent", "severity": "high",
"mitigation": "Use jitter (25-50%) on beacon intervals; minimum 30s base interval for stealth",
"relevant_tactics": ["command_and_control"]},
{"risk": "Infrastructure reuse across engagements", "severity": "critical",
"mitigation": "Provision fresh C2 infrastructure per engagement; never reuse domains or IPs",
"relevant_tactics": ["command_and_control", "initial_access"]},
{"risk": "Scanning during business hours from non-business IP", "severity": "medium",
"mitigation": "Schedule active scanning to match target business hours and geographic timezone",
"relevant_tactics": ["discovery"]},
{"risk": "Known tool signatures in memory or on disk", "severity": "high",
"mitigation": "Use custom-compiled tools or obfuscated variants; avoid default Cobalt Strike profiles",
"relevant_tactics": ["execution", "lateral_movement"]},
{"risk": "Credential dumping without EDR bypass", "severity": "critical",
"mitigation": "Assess EDR coverage before credential dumping; use protected-mode aware approaches",
"relevant_tactics": ["credential_access"]},
{"risk": "Large data transfer without staging", "severity": "high",
"mitigation": "Stage data locally, compress and encrypt before exfil; avoid single large transfers",
"relevant_tactics": ["exfiltration", "collection"]},
{"risk": "Operating outside authorized time window", "severity": "critical",
"mitigation": "Confirm maintenance and testing windows with client before operational phases",
"relevant_tactics": []},
{"risk": "Leaving artifacts in temp directories", "severity": "medium",
"mitigation": "Clean up all dropped files and created accounts before disengaging",
"relevant_tactics": ["execution", "persistence"]},
]
KILL_CHAIN_PHASE_ORDER = [
"initial_access", "execution", "persistence", "privilege_escalation",
"defense_evasion", "credential_access", "discovery", "lateral_movement",
"collection", "command_and_control", "exfiltration", "impact"
]
def list_techniques():
"""Print a formatted table of all MITRE techniques and exit."""
print(f"{'ID':<12} {'Name':<45} {'Tactic':<25} {'Det.Risk':<10} {'Access'}")
print("-" * 110)
for tid, data in sorted(MITRE_TECHNIQUES.items()):
print(
f"{tid:<12} {data['name']:<45} {data['tactic']:<25} "
f"{data['detection_risk']:<10.2f} {data['access_level']}"
)
sys.exit(0)
def build_engagement_plan(techniques_input, access_level, crown_jewels, target_count):
"""
Core planning algorithm. Returns (plan_dict, scope_violations_count).
"""
provided_level = ACCESS_LEVEL_HIERARCHY[access_level]
valid_techniques = []
scope_violations = []
not_found = []
for tid in techniques_input:
tid = tid.strip().upper()
if tid not in MITRE_TECHNIQUES:
not_found.append(tid)
continue
tech = MITRE_TECHNIQUES[tid]
required_level = ACCESS_LEVEL_HIERARCHY[tech["access_level"]]
if required_level > provided_level:
scope_violations.append({
"technique_id": tid,
"technique_name": tech["name"],
"reason": (
f"Requires '{tech['access_level']}' access; "
f"provided access level is '{access_level}'"
),
})
continue
effort_score = round(tech["detection_risk"] * (len(tech["prerequisites"]) + 1), 4)
valid_techniques.append({
"id": tid,
"name": tech["name"],
"tactic": tech["tactic"],
"detection_risk": tech["detection_risk"],
"prerequisites": tech["prerequisites"],
"effort_score": effort_score,
})
# Group by tactic and order phases by kill chain
tactic_map = {}
for t in valid_techniques:
tactic_map.setdefault(t["tactic"], []).append(t)
phases = []
tactics_present = set(tactic_map.keys())
for phase_name in KILL_CHAIN_PHASE_ORDER:
if phase_name in tactic_map:
techniques_in_phase = sorted(
tactic_map[phase_name], key=lambda x: x["effort_score"], reverse=True
)
phases.append({
"phase": phase_name,
"techniques": techniques_in_phase,
})
# Identify choke points
# A choke point is a credential_access or privilege_escalation technique
# that other selected techniques list as a prerequisite dependency,
# especially relevant when crown jewels are specified.
choke_tactic_set = {"credential_access", "privilege_escalation"}
choke_points = []
for t in valid_techniques:
if t["tactic"] not in choke_tactic_set:
continue
# Count how many other techniques depend on this tactic
dependents = [
other["id"]
for other in valid_techniques
if t["tactic"] in other["prerequisites"] and other["id"] != t["id"]
]
# If crown jewels are specified, flag anything in those choke tactics
crown_jewel_relevant = bool(crown_jewels)
if dependents or crown_jewel_relevant:
choke_points.append({
"technique_id": t["id"],
"technique_name": t["name"],
"tactic": t["tactic"],
"dependent_technique_count": len(dependents),
"dependent_techniques": dependents,
"crown_jewel_relevant": crown_jewel_relevant,
"note": (
"Blocking this technique disrupts the downstream kill-chain. "
"Priority hardening target."
),
})
# Collect OPSEC risks for tactics present in the selected techniques
seen_risks = set()
applicable_opsec = []
for risk_item in OPSEC_RISKS:
relevant = risk_item["relevant_tactics"]
# Include universal risks (empty relevant_tactics list) always
if not relevant or tactics_present.intersection(relevant):
key = risk_item["risk"]
if key not in seen_risks:
seen_risks.add(key)
applicable_opsec.append(risk_item)
# Estimate duration: sum detection_risk * 2 days per phase, minimum 3 days
raw_duration = sum(
tech["detection_risk"] * 2
for t in valid_techniques
for tech in [t] # flatten
)
# Per-phase minimum: ensure at least 0.5 day per phase
phase_count = len(phases)
estimated_days = max(3.0, round(raw_duration + phase_count * 0.5, 1))
# Scale by target_count (each additional target adds 20% duration)
if target_count and target_count > 1:
estimated_days = round(estimated_days * (1 + (target_count - 1) * 0.2), 1)
# Required authorizations list
required_authorizations = [
"Signed Rules of Engagement (RoE) document",
"Written executive/CISO authorization",
"Defined scope and out-of-scope assets list",
"Emergency stop contact and escalation path",
"Deconfliction process with SOC/Blue Team",
]
if "impact" in tactics_present:
required_authorizations.append(
"Specific written authorization for destructive/impact techniques (T14xx)"
)
if "credential_access" in tactics_present:
required_authorizations.append(
"Written authorization for credential capture and handling procedures"
)
plan = {
"engagement_summary": {
"access_level": access_level,
"crown_jewels": crown_jewels,
"target_count": target_count or 1,
"techniques_requested": len(techniques_input),
"techniques_valid": len(valid_techniques),
"techniques_not_found": not_found,
"estimated_duration_days": estimated_days,
},
"phases": phases,
"choke_points": choke_points,
"opsec_risks": applicable_opsec,
"scope_violations": scope_violations,
"required_authorizations": required_authorizations,
}
return plan, len(scope_violations)
def main():
parser = argparse.ArgumentParser(
description="Red Team Engagement Planner — Builds structured engagement plans from MITRE ATT&CK techniques.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
" python3 engagement_planner.py --techniques T1059,T1078,T1003 --access-level external --authorized --json\n"
" python3 engagement_planner.py --techniques T1059,T1078 --crown-jewels 'DB,AD' --access-level credentialed --authorized --json\n"
" python3 engagement_planner.py --list-techniques\n"
"\nExit codes:\n"
" 0 Engagement plan generated successfully\n"
" 1 Missing authorization or invalid input\n"
" 2 Scope violation or technique outside access-level constraints"
),
)
parser.add_argument(
"--techniques",
type=str,
default="",
help="Comma-separated MITRE ATT&CK technique IDs (e.g. T1059,T1078,T1003)",
)
parser.add_argument(
"--access-level",
choices=["external", "internal", "credentialed"],
default="external",
help="Attacker access level for this engagement (default: external)",
)
parser.add_argument(
"--crown-jewels",
type=str,
default="",
help="Comma-separated crown jewel asset labels (e.g. 'DB,AD,PaymentSystem')",
)
parser.add_argument(
"--target-count",
type=int,
default=1,
help="Number of target systems/segments (affects duration estimate, default: 1)",
)
parser.add_argument(
"--authorized",
action="store_true",
help="Confirms signed RoE and executive authorization have been obtained",
)
parser.add_argument(
"--json",
action="store_true",
dest="output_json",
help="Output results as JSON",
)
parser.add_argument(
"--list-techniques",
action="store_true",
help="Print all available MITRE techniques and exit",
)
args = parser.parse_args()
if args.list_techniques:
list_techniques() # exits internally
# Authorization gate
if not args.authorized:
msg = (
"Authorization required: obtain signed RoE before planning. "
"Use --authorized flag only after legal sign-off."
)
if args.output_json:
print(json.dumps({"error": msg, "exit_code": 1}, indent=2))
else:
print(f"ERROR: {msg}", file=sys.stderr)
sys.exit(1)
if not args.techniques.strip():
msg = "No techniques specified. Use --techniques T1059,T1078,... or --list-techniques."
if args.output_json:
print(json.dumps({"error": msg, "exit_code": 1}, indent=2))
else:
print(f"ERROR: {msg}", file=sys.stderr)
sys.exit(1)
techniques_input = [t.strip() for t in args.techniques.split(",") if t.strip()]
crown_jewels = [c.strip() for c in args.crown_jewels.split(",") if c.strip()]
plan, violation_count = build_engagement_plan(
techniques_input=techniques_input,
access_level=args.access_level,
crown_jewels=crown_jewels,
target_count=args.target_count,
)
if args.output_json:
print(json.dumps(plan, indent=2))
else:
summary = plan["engagement_summary"]
print("\n=== RED TEAM ENGAGEMENT PLAN ===")
print(f"Access Level : {summary['access_level']}")
print(f"Crown Jewels : {', '.join(crown_jewels) if crown_jewels else 'Not specified'}")
print(f"Techniques : {summary['techniques_valid']}/{summary['techniques_requested']} valid")
print(f"Est. Duration : {summary['estimated_duration_days']} days")
if summary["techniques_not_found"]:
print(f"Not Found : {', '.join(summary['techniques_not_found'])}")
print("\n--- Kill-Chain Phases ---")
for phase in plan["phases"]:
print(f"\n [{phase['phase'].upper()}]")
for t in phase["techniques"]:
print(f" {t['id']:<12} {t['name']:<45} risk={t['detection_risk']:.2f} effort={t['effort_score']:.3f}")
print("\n--- Choke Points ---")
if plan["choke_points"]:
for cp in plan["choke_points"]:
print(f" {cp['technique_id']} {cp['technique_name']}{cp['note']}")
else:
print(" None identified.")
print("\n--- OPSEC Risks ---")
for risk in plan["opsec_risks"]:
print(f" [{risk['severity'].upper()}] {risk['risk']}")
print(f" Mitigation: {risk['mitigation']}")
if plan["scope_violations"]:
print("\n--- SCOPE VIOLATIONS ---")
for sv in plan["scope_violations"]:
print(f" {sv['technique_id']}: {sv['reason']}")
print("\n--- Required Authorizations ---")
for auth in plan["required_authorizations"]:
print(f" - {auth}")
print()
if violation_count > 0:
sys.exit(2)
sys.exit(0)
if __name__ == "__main__":
main()