Adds threat-detection, incident-response, cloud-security, red-team, and ai-security skills to engineering-team. Each includes SKILL.md, references, and Python scripts (stdlib-only). Consolidation of 66 individual skills into 5 production-ready packages.
421 lines
20 KiB
Python
421 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
engagement_planner.py — Red Team Engagement Planner
|
|
|
|
Builds a structured red team engagement plan from target scope, MITRE ATT&CK
|
|
technique selection, access level, and crown jewel assets. Scores techniques
|
|
by detection risk and effort, assembles kill-chain phases, identifies choke
|
|
points, and generates OPSEC risk items.
|
|
|
|
IMPORTANT: Authorization is required. Use --authorized flag only after obtaining
|
|
signed Rules of Engagement (RoE) and written executive authorization.
|
|
|
|
Usage:
|
|
python3 engagement_planner.py --techniques T1059,T1078,T1003 --access-level external --authorized --json
|
|
python3 engagement_planner.py --techniques T1059,T1078 --crown-jewels "DB,AD" --access-level credentialed --authorized --json
|
|
python3 engagement_planner.py --list-techniques
|
|
|
|
Exit codes:
|
|
0 Engagement plan generated successfully
|
|
1 Missing authorization or invalid input
|
|
2 Scope violation or technique outside access-level constraints
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
|
|
MITRE_TECHNIQUES = {
|
|
"T1059": {"name": "Command and Scripting Interpreter", "tactic": "execution",
|
|
"detection_risk": 0.7, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
"T1059.001": {"name": "PowerShell", "tactic": "execution",
|
|
"detection_risk": 0.8, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
"T1078": {"name": "Valid Accounts", "tactic": "initial_access",
|
|
"detection_risk": 0.3, "prerequisites": [], "access_level": "external"},
|
|
"T1078.004": {"name": "Valid Accounts: Cloud Accounts", "tactic": "initial_access",
|
|
"detection_risk": 0.3, "prerequisites": [], "access_level": "external"},
|
|
"T1003": {"name": "OS Credential Dumping", "tactic": "credential_access",
|
|
"detection_risk": 0.9, "prerequisites": ["initial_access", "privilege_escalation"], "access_level": "internal"},
|
|
"T1003.001": {"name": "LSASS Memory", "tactic": "credential_access",
|
|
"detection_risk": 0.95, "prerequisites": ["initial_access", "privilege_escalation"], "access_level": "credentialed"},
|
|
"T1021": {"name": "Remote Services", "tactic": "lateral_movement",
|
|
"detection_risk": 0.6, "prerequisites": ["initial_access", "credential_access"], "access_level": "internal"},
|
|
"T1021.002": {"name": "SMB/Windows Admin Shares", "tactic": "lateral_movement",
|
|
"detection_risk": 0.7, "prerequisites": ["initial_access", "credential_access"], "access_level": "internal"},
|
|
"T1055": {"name": "Process Injection", "tactic": "defense_evasion",
|
|
"detection_risk": 0.85, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
"T1190": {"name": "Exploit Public-Facing Application", "tactic": "initial_access",
|
|
"detection_risk": 0.5, "prerequisites": [], "access_level": "external"},
|
|
"T1566": {"name": "Phishing", "tactic": "initial_access",
|
|
"detection_risk": 0.4, "prerequisites": [], "access_level": "external"},
|
|
"T1566.001": {"name": "Spearphishing Attachment", "tactic": "initial_access",
|
|
"detection_risk": 0.5, "prerequisites": [], "access_level": "external"},
|
|
"T1098": {"name": "Account Manipulation", "tactic": "persistence",
|
|
"detection_risk": 0.6, "prerequisites": ["initial_access", "privilege_escalation"], "access_level": "credentialed"},
|
|
"T1136": {"name": "Create Account", "tactic": "persistence",
|
|
"detection_risk": 0.7, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
"T1053": {"name": "Scheduled Task/Job", "tactic": "persistence",
|
|
"detection_risk": 0.6, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
"T1486": {"name": "Data Encrypted for Impact", "tactic": "impact",
|
|
"detection_risk": 0.99, "prerequisites": ["initial_access", "lateral_movement"], "access_level": "credentialed"},
|
|
"T1530": {"name": "Data from Cloud Storage", "tactic": "collection",
|
|
"detection_risk": 0.4, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
"T1041": {"name": "Exfiltration Over C2 Channel", "tactic": "exfiltration",
|
|
"detection_risk": 0.65, "prerequisites": ["initial_access", "collection"], "access_level": "internal"},
|
|
"T1048": {"name": "Exfiltration Over Alternative Protocol", "tactic": "exfiltration",
|
|
"detection_risk": 0.5, "prerequisites": ["initial_access", "collection"], "access_level": "internal"},
|
|
"T1083": {"name": "File and Directory Discovery", "tactic": "discovery",
|
|
"detection_risk": 0.3, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
"T1082": {"name": "System Information Discovery", "tactic": "discovery",
|
|
"detection_risk": 0.2, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
"T1057": {"name": "Process Discovery", "tactic": "discovery",
|
|
"detection_risk": 0.25, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
"T1068": {"name": "Exploitation for Privilege Escalation", "tactic": "privilege_escalation",
|
|
"detection_risk": 0.8, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
"T1484": {"name": "Domain Policy Modification", "tactic": "privilege_escalation",
|
|
"detection_risk": 0.85, "prerequisites": ["initial_access", "privilege_escalation"], "access_level": "credentialed"},
|
|
"T1562": {"name": "Impair Defenses", "tactic": "defense_evasion",
|
|
"detection_risk": 0.9, "prerequisites": ["initial_access", "privilege_escalation"], "access_level": "credentialed"},
|
|
"T1070": {"name": "Indicator Removal", "tactic": "defense_evasion",
|
|
"detection_risk": 0.75, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
"T1195": {"name": "Supply Chain Compromise", "tactic": "initial_access",
|
|
"detection_risk": 0.2, "prerequisites": [], "access_level": "external"},
|
|
"T1218": {"name": "System Binary Proxy Execution", "tactic": "defense_evasion",
|
|
"detection_risk": 0.6, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
"T1105": {"name": "Ingress Tool Transfer", "tactic": "command_and_control",
|
|
"detection_risk": 0.55, "prerequisites": ["initial_access"], "access_level": "internal"},
|
|
}
|
|
|
|
ACCESS_LEVEL_HIERARCHY = {"external": 0, "internal": 1, "credentialed": 2}
|
|
|
|
OPSEC_RISKS = [
|
|
{"risk": "C2 beacon interval too frequent", "severity": "high",
|
|
"mitigation": "Use jitter (25-50%) on beacon intervals; minimum 30s base interval for stealth",
|
|
"relevant_tactics": ["command_and_control"]},
|
|
{"risk": "Infrastructure reuse across engagements", "severity": "critical",
|
|
"mitigation": "Provision fresh C2 infrastructure per engagement; never reuse domains or IPs",
|
|
"relevant_tactics": ["command_and_control", "initial_access"]},
|
|
{"risk": "Scanning during business hours from non-business IP", "severity": "medium",
|
|
"mitigation": "Schedule active scanning to match target business hours and geographic timezone",
|
|
"relevant_tactics": ["discovery"]},
|
|
{"risk": "Known tool signatures in memory or on disk", "severity": "high",
|
|
"mitigation": "Use custom-compiled tools or obfuscated variants; avoid default Cobalt Strike profiles",
|
|
"relevant_tactics": ["execution", "lateral_movement"]},
|
|
{"risk": "Credential dumping without EDR bypass", "severity": "critical",
|
|
"mitigation": "Assess EDR coverage before credential dumping; use protected-mode aware approaches",
|
|
"relevant_tactics": ["credential_access"]},
|
|
{"risk": "Large data transfer without staging", "severity": "high",
|
|
"mitigation": "Stage data locally, compress and encrypt before exfil; avoid single large transfers",
|
|
"relevant_tactics": ["exfiltration", "collection"]},
|
|
{"risk": "Operating outside authorized time window", "severity": "critical",
|
|
"mitigation": "Confirm maintenance and testing windows with client before operational phases",
|
|
"relevant_tactics": []},
|
|
{"risk": "Leaving artifacts in temp directories", "severity": "medium",
|
|
"mitigation": "Clean up all dropped files and created accounts before disengaging",
|
|
"relevant_tactics": ["execution", "persistence"]},
|
|
]
|
|
|
|
KILL_CHAIN_PHASE_ORDER = [
|
|
"initial_access", "execution", "persistence", "privilege_escalation",
|
|
"defense_evasion", "credential_access", "discovery", "lateral_movement",
|
|
"collection", "command_and_control", "exfiltration", "impact"
|
|
]
|
|
|
|
|
|
def list_techniques():
|
|
"""Print a formatted table of all MITRE techniques and exit."""
|
|
print(f"{'ID':<12} {'Name':<45} {'Tactic':<25} {'Det.Risk':<10} {'Access'}")
|
|
print("-" * 110)
|
|
for tid, data in sorted(MITRE_TECHNIQUES.items()):
|
|
print(
|
|
f"{tid:<12} {data['name']:<45} {data['tactic']:<25} "
|
|
f"{data['detection_risk']:<10.2f} {data['access_level']}"
|
|
)
|
|
sys.exit(0)
|
|
|
|
|
|
def build_engagement_plan(techniques_input, access_level, crown_jewels, target_count):
|
|
"""
|
|
Core planning algorithm. Returns (plan_dict, scope_violations_count).
|
|
"""
|
|
provided_level = ACCESS_LEVEL_HIERARCHY[access_level]
|
|
valid_techniques = []
|
|
scope_violations = []
|
|
not_found = []
|
|
|
|
for tid in techniques_input:
|
|
tid = tid.strip().upper()
|
|
if tid not in MITRE_TECHNIQUES:
|
|
not_found.append(tid)
|
|
continue
|
|
tech = MITRE_TECHNIQUES[tid]
|
|
required_level = ACCESS_LEVEL_HIERARCHY[tech["access_level"]]
|
|
if required_level > provided_level:
|
|
scope_violations.append({
|
|
"technique_id": tid,
|
|
"technique_name": tech["name"],
|
|
"reason": (
|
|
f"Requires '{tech['access_level']}' access; "
|
|
f"provided access level is '{access_level}'"
|
|
),
|
|
})
|
|
continue
|
|
effort_score = round(tech["detection_risk"] * (len(tech["prerequisites"]) + 1), 4)
|
|
valid_techniques.append({
|
|
"id": tid,
|
|
"name": tech["name"],
|
|
"tactic": tech["tactic"],
|
|
"detection_risk": tech["detection_risk"],
|
|
"prerequisites": tech["prerequisites"],
|
|
"effort_score": effort_score,
|
|
})
|
|
|
|
# Group by tactic and order phases by kill chain
|
|
tactic_map = {}
|
|
for t in valid_techniques:
|
|
tactic_map.setdefault(t["tactic"], []).append(t)
|
|
|
|
phases = []
|
|
tactics_present = set(tactic_map.keys())
|
|
for phase_name in KILL_CHAIN_PHASE_ORDER:
|
|
if phase_name in tactic_map:
|
|
techniques_in_phase = sorted(
|
|
tactic_map[phase_name], key=lambda x: x["effort_score"], reverse=True
|
|
)
|
|
phases.append({
|
|
"phase": phase_name,
|
|
"techniques": techniques_in_phase,
|
|
})
|
|
|
|
# Identify choke points
|
|
# A choke point is a credential_access or privilege_escalation technique
|
|
# that other selected techniques list as a prerequisite dependency,
|
|
# especially relevant when crown jewels are specified.
|
|
choke_tactic_set = {"credential_access", "privilege_escalation"}
|
|
choke_points = []
|
|
for t in valid_techniques:
|
|
if t["tactic"] not in choke_tactic_set:
|
|
continue
|
|
# Count how many other techniques depend on this tactic
|
|
dependents = [
|
|
other["id"]
|
|
for other in valid_techniques
|
|
if t["tactic"] in other["prerequisites"] and other["id"] != t["id"]
|
|
]
|
|
# If crown jewels are specified, flag anything in those choke tactics
|
|
crown_jewel_relevant = bool(crown_jewels)
|
|
if dependents or crown_jewel_relevant:
|
|
choke_points.append({
|
|
"technique_id": t["id"],
|
|
"technique_name": t["name"],
|
|
"tactic": t["tactic"],
|
|
"dependent_technique_count": len(dependents),
|
|
"dependent_techniques": dependents,
|
|
"crown_jewel_relevant": crown_jewel_relevant,
|
|
"note": (
|
|
"Blocking this technique disrupts the downstream kill-chain. "
|
|
"Priority hardening target."
|
|
),
|
|
})
|
|
|
|
# Collect OPSEC risks for tactics present in the selected techniques
|
|
seen_risks = set()
|
|
applicable_opsec = []
|
|
for risk_item in OPSEC_RISKS:
|
|
relevant = risk_item["relevant_tactics"]
|
|
# Include universal risks (empty relevant_tactics list) always
|
|
if not relevant or tactics_present.intersection(relevant):
|
|
key = risk_item["risk"]
|
|
if key not in seen_risks:
|
|
seen_risks.add(key)
|
|
applicable_opsec.append(risk_item)
|
|
|
|
# Estimate duration: sum detection_risk * 2 days per phase, minimum 3 days
|
|
raw_duration = sum(
|
|
tech["detection_risk"] * 2
|
|
for t in valid_techniques
|
|
for tech in [t] # flatten
|
|
)
|
|
# Per-phase minimum: ensure at least 0.5 day per phase
|
|
phase_count = len(phases)
|
|
estimated_days = max(3.0, round(raw_duration + phase_count * 0.5, 1))
|
|
|
|
# Scale by target_count (each additional target adds 20% duration)
|
|
if target_count and target_count > 1:
|
|
estimated_days = round(estimated_days * (1 + (target_count - 1) * 0.2), 1)
|
|
|
|
# Required authorizations list
|
|
required_authorizations = [
|
|
"Signed Rules of Engagement (RoE) document",
|
|
"Written executive/CISO authorization",
|
|
"Defined scope and out-of-scope assets list",
|
|
"Emergency stop contact and escalation path",
|
|
"Deconfliction process with SOC/Blue Team",
|
|
]
|
|
if "impact" in tactics_present:
|
|
required_authorizations.append(
|
|
"Specific written authorization for destructive/impact techniques (T14xx)"
|
|
)
|
|
if "credential_access" in tactics_present:
|
|
required_authorizations.append(
|
|
"Written authorization for credential capture and handling procedures"
|
|
)
|
|
|
|
plan = {
|
|
"engagement_summary": {
|
|
"access_level": access_level,
|
|
"crown_jewels": crown_jewels,
|
|
"target_count": target_count or 1,
|
|
"techniques_requested": len(techniques_input),
|
|
"techniques_valid": len(valid_techniques),
|
|
"techniques_not_found": not_found,
|
|
"estimated_duration_days": estimated_days,
|
|
},
|
|
"phases": phases,
|
|
"choke_points": choke_points,
|
|
"opsec_risks": applicable_opsec,
|
|
"scope_violations": scope_violations,
|
|
"required_authorizations": required_authorizations,
|
|
}
|
|
return plan, len(scope_violations)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Red Team Engagement Planner — Builds structured engagement plans from MITRE ATT&CK techniques.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=(
|
|
"Examples:\n"
|
|
" python3 engagement_planner.py --techniques T1059,T1078,T1003 --access-level external --authorized --json\n"
|
|
" python3 engagement_planner.py --techniques T1059,T1078 --crown-jewels 'DB,AD' --access-level credentialed --authorized --json\n"
|
|
" python3 engagement_planner.py --list-techniques\n"
|
|
"\nExit codes:\n"
|
|
" 0 Engagement plan generated successfully\n"
|
|
" 1 Missing authorization or invalid input\n"
|
|
" 2 Scope violation or technique outside access-level constraints"
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--techniques",
|
|
type=str,
|
|
default="",
|
|
help="Comma-separated MITRE ATT&CK technique IDs (e.g. T1059,T1078,T1003)",
|
|
)
|
|
parser.add_argument(
|
|
"--access-level",
|
|
choices=["external", "internal", "credentialed"],
|
|
default="external",
|
|
help="Attacker access level for this engagement (default: external)",
|
|
)
|
|
parser.add_argument(
|
|
"--crown-jewels",
|
|
type=str,
|
|
default="",
|
|
help="Comma-separated crown jewel asset labels (e.g. 'DB,AD,PaymentSystem')",
|
|
)
|
|
parser.add_argument(
|
|
"--target-count",
|
|
type=int,
|
|
default=1,
|
|
help="Number of target systems/segments (affects duration estimate, default: 1)",
|
|
)
|
|
parser.add_argument(
|
|
"--authorized",
|
|
action="store_true",
|
|
help="Confirms signed RoE and executive authorization have been obtained",
|
|
)
|
|
parser.add_argument(
|
|
"--json",
|
|
action="store_true",
|
|
dest="output_json",
|
|
help="Output results as JSON",
|
|
)
|
|
parser.add_argument(
|
|
"--list-techniques",
|
|
action="store_true",
|
|
help="Print all available MITRE techniques and exit",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.list_techniques:
|
|
list_techniques() # exits internally
|
|
|
|
# Authorization gate
|
|
if not args.authorized:
|
|
msg = (
|
|
"Authorization required: obtain signed RoE before planning. "
|
|
"Use --authorized flag only after legal sign-off."
|
|
)
|
|
if args.output_json:
|
|
print(json.dumps({"error": msg, "exit_code": 1}, indent=2))
|
|
else:
|
|
print(f"ERROR: {msg}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if not args.techniques.strip():
|
|
msg = "No techniques specified. Use --techniques T1059,T1078,... or --list-techniques."
|
|
if args.output_json:
|
|
print(json.dumps({"error": msg, "exit_code": 1}, indent=2))
|
|
else:
|
|
print(f"ERROR: {msg}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
techniques_input = [t.strip() for t in args.techniques.split(",") if t.strip()]
|
|
crown_jewels = [c.strip() for c in args.crown_jewels.split(",") if c.strip()]
|
|
|
|
plan, violation_count = build_engagement_plan(
|
|
techniques_input=techniques_input,
|
|
access_level=args.access_level,
|
|
crown_jewels=crown_jewels,
|
|
target_count=args.target_count,
|
|
)
|
|
|
|
if args.output_json:
|
|
print(json.dumps(plan, indent=2))
|
|
else:
|
|
summary = plan["engagement_summary"]
|
|
print("\n=== RED TEAM ENGAGEMENT PLAN ===")
|
|
print(f"Access Level : {summary['access_level']}")
|
|
print(f"Crown Jewels : {', '.join(crown_jewels) if crown_jewels else 'Not specified'}")
|
|
print(f"Techniques : {summary['techniques_valid']}/{summary['techniques_requested']} valid")
|
|
print(f"Est. Duration : {summary['estimated_duration_days']} days")
|
|
if summary["techniques_not_found"]:
|
|
print(f"Not Found : {', '.join(summary['techniques_not_found'])}")
|
|
|
|
print("\n--- Kill-Chain Phases ---")
|
|
for phase in plan["phases"]:
|
|
print(f"\n [{phase['phase'].upper()}]")
|
|
for t in phase["techniques"]:
|
|
print(f" {t['id']:<12} {t['name']:<45} risk={t['detection_risk']:.2f} effort={t['effort_score']:.3f}")
|
|
|
|
print("\n--- Choke Points ---")
|
|
if plan["choke_points"]:
|
|
for cp in plan["choke_points"]:
|
|
print(f" {cp['technique_id']} {cp['technique_name']} — {cp['note']}")
|
|
else:
|
|
print(" None identified.")
|
|
|
|
print("\n--- OPSEC Risks ---")
|
|
for risk in plan["opsec_risks"]:
|
|
print(f" [{risk['severity'].upper()}] {risk['risk']}")
|
|
print(f" Mitigation: {risk['mitigation']}")
|
|
|
|
if plan["scope_violations"]:
|
|
print("\n--- SCOPE VIOLATIONS ---")
|
|
for sv in plan["scope_violations"]:
|
|
print(f" {sv['technique_id']}: {sv['reason']}")
|
|
|
|
print("\n--- Required Authorizations ---")
|
|
for auth in plan["required_authorizations"]:
|
|
print(f" - {auth}")
|
|
print()
|
|
|
|
if violation_count > 0:
|
|
sys.exit(2)
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|