Files
claude-skills-reference/engineering-team/security-pen-testing/scripts/vulnerability_scanner.py
Reza Rezvani 2056ba251f feat(engineering-team): add azure-cloud-architect, security-pen-testing; extend terraform-patterns
azure-cloud-architect (451-line SKILL.md, 3 scripts, 3 references):
- 6-step workflow mirroring aws-solution-architect for Azure
- Bicep/ARM templates, AKS, Functions, Cosmos DB, cost optimization
- architecture_designer.py, cost_optimizer.py, bicep_generator.py

security-pen-testing (850-line SKILL.md, 3 scripts, 3 references):
- OWASP Top 10 systematic audit, offensive security testing
- XSS/SQLi/SSRF/IDOR detection, secret scanning, API security
- vulnerability_scanner.py, dependency_auditor.py, pentest_report_generator.py
- Responsible disclosure workflow included

terraform-patterns extended (487 → 740 lines):
- Multi-cloud provider configuration
- OpenTofu compatibility notes
- Infracost integration for PR cost estimation
- Import existing infrastructure patterns
- Terragrunt DRY multi-environment patterns

Updated engineering-team plugin.json (26 → 28 skills).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 13:32:22 +01:00

546 lines
27 KiB
Python

#!/usr/bin/env python3
"""
Vulnerability Scanner - Generate OWASP Top 10 security checklists and scan for common patterns.
Table of Contents:
VulnerabilityScanner - Main class for vulnerability scanning
__init__ - Initialize with target type and scope
generate_checklist - Generate OWASP Top 10 checklist for target
scan_source - Scan source directory for vulnerability patterns
_scan_file - Scan individual file for regex patterns
_get_owasp_checks - Return OWASP checks for target type
main() - CLI entry point
Usage:
python vulnerability_scanner.py --target web --scope full
python vulnerability_scanner.py --target api --scope quick --json
python vulnerability_scanner.py --target web --source /path/to/code --scope full
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict, field
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
@dataclass
class CheckItem:
"""A single check item in the OWASP checklist."""
owasp_id: str
owasp_category: str
check_id: str
title: str
description: str
test_procedure: str
severity: str # critical, high, medium, low, info
applicable_targets: List[str] = field(default_factory=list)
status: str = "pending" # pending, pass, fail, na
@dataclass
class SourceFinding:
"""A vulnerability pattern found in source code."""
rule_id: str
title: str
severity: str
owasp_category: str
file_path: str
line_number: int
code_snippet: str
recommendation: str
class VulnerabilityScanner:
"""Generate OWASP Top 10 checklists and scan source code for vulnerability patterns."""
SCAN_EXTENSIONS = {
".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".go",
".rb", ".php", ".cs", ".rs", ".html", ".vue", ".svelte",
}
SKIP_DIRS = {
"node_modules", ".git", "__pycache__", ".venv", "venv",
"vendor", "dist", "build", ".next", "target",
}
def __init__(self, target: str = "web", scope: str = "full", source: Optional[str] = None):
self.target = target
self.scope = scope
self.source = source
def generate_checklist(self) -> List[CheckItem]:
"""Generate OWASP Top 10 checklist for the given target and scope."""
all_checks = self._get_owasp_checks()
filtered = []
for check in all_checks:
if self.target not in check.applicable_targets and "all" not in check.applicable_targets:
continue
if self.scope == "quick" and check.severity in ("low", "info"):
continue
filtered.append(check)
return filtered
def scan_source(self, path: str) -> List[SourceFinding]:
"""Scan source directory for common vulnerability patterns."""
findings = []
source_path = Path(path)
if not source_path.exists():
return findings
for root, dirs, files in os.walk(source_path):
dirs[:] = [d for d in dirs if d not in self.SKIP_DIRS]
for fname in files:
fpath = Path(root) / fname
if fpath.suffix in self.SCAN_EXTENSIONS:
findings.extend(self._scan_file(fpath))
return findings
def _scan_file(self, file_path: Path) -> List[SourceFinding]:
"""Scan a single file for vulnerability patterns."""
findings = []
try:
content = file_path.read_text(encoding="utf-8", errors="ignore")
except (OSError, PermissionError):
return findings
patterns = [
{
"rule_id": "SQLI-001",
"title": "Potential SQL Injection (string concatenation)",
"severity": "critical",
"owasp_category": "A03:2021 - Injection",
"pattern": r'''(?:execute|query|cursor\.execute)\s*\(\s*(?:f["\']|["\'].*%s|["\'].*\+\s*\w+|["\'].*\.format)''',
"recommendation": "Use parameterized queries or prepared statements instead of string concatenation.",
"extensions": {".py", ".js", ".ts", ".java", ".rb", ".php"},
},
{
"rule_id": "SQLI-002",
"title": "Potential SQL Injection (template literal)",
"severity": "critical",
"owasp_category": "A03:2021 - Injection",
"pattern": r'''(?:query|execute|raw)\s*\(\s*`[^`]*\$\{''',
"recommendation": "Use parameterized queries. Never interpolate user input into SQL strings.",
"extensions": {".js", ".ts", ".jsx", ".tsx"},
},
{
"rule_id": "XSS-001",
"title": "Potential DOM-based XSS (innerHTML)",
"severity": "high",
"owasp_category": "A03:2021 - Injection",
"pattern": r'''\.innerHTML\s*=\s*(?!['"][^'"]*['"])''',
"recommendation": "Use textContent or a sanitization library (DOMPurify) instead of innerHTML.",
"extensions": {".js", ".ts", ".jsx", ".tsx", ".html", ".vue", ".svelte"},
},
{
"rule_id": "XSS-002",
"title": "React dangerouslySetInnerHTML usage",
"severity": "high",
"owasp_category": "A03:2021 - Injection",
"pattern": r'''dangerouslySetInnerHTML''',
"recommendation": "Sanitize HTML with DOMPurify before using dangerouslySetInnerHTML.",
"extensions": {".jsx", ".tsx", ".js", ".ts"},
},
{
"rule_id": "CMDI-001",
"title": "Potential Command Injection (shell=True)",
"severity": "critical",
"owasp_category": "A03:2021 - Injection",
"pattern": r'''subprocess\.\w+\(.*shell\s*=\s*True''',
"recommendation": "Avoid shell=True. Use subprocess with a list of arguments instead.",
"extensions": {".py"},
},
{
"rule_id": "CMDI-002",
"title": "Potential Command Injection (eval/exec)",
"severity": "critical",
"owasp_category": "A03:2021 - Injection",
"pattern": r'''(?:^|\s)(?:eval|exec)\s*\((?!.*(?:#\s*nosec|NOSONAR))''',
"recommendation": "Never use eval() or exec() with untrusted input. Use ast.literal_eval() for data parsing.",
"extensions": {".py", ".js", ".ts"},
},
{
"rule_id": "SEC-001",
"title": "Hardcoded Secret or API Key",
"severity": "critical",
"owasp_category": "A02:2021 - Cryptographic Failures",
"pattern": r'''(?i)(?:api[_-]?key|secret[_-]?key|password|passwd|token)\s*[:=]\s*['\"][a-zA-Z0-9+/=]{16,}['\"]''',
"recommendation": "Move secrets to environment variables or a secrets manager (Vault, AWS Secrets Manager).",
"extensions": {".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".go", ".rb", ".php"},
},
{
"rule_id": "SEC-002",
"title": "AWS Access Key ID detected",
"severity": "critical",
"owasp_category": "A02:2021 - Cryptographic Failures",
"pattern": r'''AKIA[0-9A-Z]{16}''',
"recommendation": "Remove the AWS key immediately. Rotate the credential and use IAM roles or environment variables.",
"extensions": None, # scan all files
},
{
"rule_id": "CRYPTO-001",
"title": "Weak hashing algorithm (MD5/SHA1)",
"severity": "high",
"owasp_category": "A02:2021 - Cryptographic Failures",
"pattern": r'''(?:md5|sha1)\s*\(''',
"recommendation": "Use bcrypt, scrypt, or argon2 for passwords. Use SHA-256+ for integrity checks.",
"extensions": {".py", ".js", ".ts", ".java", ".go", ".rb", ".php"},
},
{
"rule_id": "SSRF-001",
"title": "Potential SSRF (user-controlled URL in HTTP request)",
"severity": "high",
"owasp_category": "A10:2021 - SSRF",
"pattern": r'''(?:requests\.get|fetch|axios|http\.get|urllib\.request\.urlopen)\s*\(\s*(?:request\.|req\.|params|args|input|user)''',
"recommendation": "Validate and allowlist URLs before making outbound requests. Block internal IPs.",
"extensions": {".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".go"},
},
{
"rule_id": "PATH-001",
"title": "Potential Path Traversal",
"severity": "high",
"owasp_category": "A01:2021 - Broken Access Control",
"pattern": r'''(?:open|readFile|readFileSync|Path\.join)\s*\(.*(?:request\.|req\.|params|args|input|user)''',
"recommendation": "Sanitize file paths. Use os.path.basename() and validate against an allowlist.",
"extensions": {".py", ".js", ".ts", ".java", ".go"},
},
{
"rule_id": "DESER-001",
"title": "Unsafe Deserialization (pickle/yaml.load)",
"severity": "critical",
"owasp_category": "A08:2021 - Software and Data Integrity Failures",
"pattern": r'''(?:pickle\.load|yaml\.load\s*\([^)]*\)\s*(?!.*Loader\s*=\s*yaml\.SafeLoader))''',
"recommendation": "Use yaml.safe_load() instead of yaml.load(). Avoid pickle for untrusted data.",
"extensions": {".py"},
},
{
"rule_id": "AUTH-001",
"title": "JWT with hardcoded secret",
"severity": "critical",
"owasp_category": "A07:2021 - Identification and Authentication Failures",
"pattern": r'''jwt\.(?:encode|sign)\s*\([^)]*['\"][a-zA-Z0-9]{8,}['\"]''',
"recommendation": "Load JWT secrets from environment variables. Use RS256 with key pairs for production.",
"extensions": {".py", ".js", ".ts"},
},
]
lines = content.split("\n")
for i, line in enumerate(lines, 1):
for pat in patterns:
exts = pat.get("extensions")
if exts and file_path.suffix not in exts:
continue
if re.search(pat["pattern"], line):
findings.append(SourceFinding(
rule_id=pat["rule_id"],
title=pat["title"],
severity=pat["severity"],
owasp_category=pat["owasp_category"],
file_path=str(file_path),
line_number=i,
code_snippet=line.strip()[:200],
recommendation=pat["recommendation"],
))
return findings
def _get_owasp_checks(self) -> List[CheckItem]:
"""Return comprehensive OWASP Top 10 checklist items."""
checks = [
# A01: Broken Access Control
CheckItem("A01", "Broken Access Control", "A01-01",
"Horizontal Privilege Escalation",
"Verify users cannot access other users' resources by changing IDs.",
"Change resource IDs in API requests (e.g., /users/123 → /users/124). Expect 403.",
"critical", ["web", "api", "all"]),
CheckItem("A01", "Broken Access Control", "A01-02",
"Vertical Privilege Escalation",
"Verify regular users cannot access admin endpoints.",
"Authenticate as regular user, request admin endpoints. Expect 403.",
"critical", ["web", "api", "all"]),
CheckItem("A01", "Broken Access Control", "A01-03",
"CORS Misconfiguration",
"Verify CORS policy does not allow arbitrary origins.",
"Send request with Origin: https://evil.com. Check Access-Control-Allow-Origin.",
"high", ["web", "api"]),
CheckItem("A01", "Broken Access Control", "A01-04",
"Forced Browsing",
"Check for unprotected admin or debug pages.",
"Request /admin, /debug, /api/admin, /.env, /swagger. Expect 403 or 404.",
"high", ["web", "all"]),
CheckItem("A01", "Broken Access Control", "A01-05",
"Directory Listing",
"Verify directory listing is disabled on the web server.",
"Request directory paths without index file. Should not list contents.",
"medium", ["web"]),
# A02: Cryptographic Failures
CheckItem("A02", "Cryptographic Failures", "A02-01",
"TLS Version Check",
"Ensure TLS 1.2+ is enforced. Reject TLS 1.0/1.1.",
"Run: nmap --script ssl-enum-ciphers -p 443 target.com",
"high", ["web", "api", "all"]),
CheckItem("A02", "Cryptographic Failures", "A02-02",
"Password Hashing Algorithm",
"Verify passwords use bcrypt/scrypt/argon2 with adequate cost.",
"Review authentication code for hashing implementation.",
"critical", ["web", "api", "all"]),
CheckItem("A02", "Cryptographic Failures", "A02-03",
"Sensitive Data in URLs",
"Check for tokens, passwords, or PII in query parameters.",
"Review access logs and URL patterns for sensitive query params.",
"high", ["web", "api"]),
CheckItem("A02", "Cryptographic Failures", "A02-04",
"HSTS Header",
"Verify Strict-Transport-Security header is present.",
"Check response headers for HSTS with max-age >= 31536000.",
"medium", ["web"]),
# A03: Injection
CheckItem("A03", "Injection", "A03-01",
"SQL Injection",
"Test input fields for SQL injection vulnerabilities.",
"Submit ' OR 1=1-- in input fields. Check for errors or unexpected behavior.",
"critical", ["web", "api", "all"]),
CheckItem("A03", "Injection", "A03-02",
"XSS (Cross-Site Scripting)",
"Test for reflected, stored, and DOM-based XSS.",
"Submit <script>alert(1)</script> in input fields. Check if rendered.",
"high", ["web", "all"]),
CheckItem("A03", "Injection", "A03-03",
"Command Injection",
"Test for OS command injection in input fields.",
"Submit ; whoami in fields that may trigger system commands.",
"critical", ["web", "api"]),
CheckItem("A03", "Injection", "A03-04",
"Template Injection",
"Test for server-side template injection.",
"Submit {{7*7}} and ${7*7} in input fields. Check for 49 in response.",
"high", ["web", "api"]),
CheckItem("A03", "Injection", "A03-05",
"NoSQL Injection",
"Test for NoSQL injection in JSON inputs.",
"Submit {\"$gt\": \"\"} in JSON fields. Check for data leakage.",
"high", ["api"]),
# A04: Insecure Design
CheckItem("A04", "Insecure Design", "A04-01",
"Rate Limiting on Authentication",
"Verify rate limiting exists on login and password reset endpoints.",
"Send 50+ rapid login requests. Expect 429 after threshold.",
"high", ["web", "api", "all"]),
CheckItem("A04", "Insecure Design", "A04-02",
"Business Logic Abuse",
"Test for business logic flaws (negative quantities, state manipulation).",
"Try negative values, skip steps in workflows, manipulate client-side calculations.",
"high", ["web", "api"]),
CheckItem("A04", "Insecure Design", "A04-03",
"Account Lockout",
"Verify account lockout after repeated failed login attempts.",
"Submit 10+ failed login attempts. Check for lockout or CAPTCHA.",
"medium", ["web", "api"]),
# A05: Security Misconfiguration
CheckItem("A05", "Security Misconfiguration", "A05-01",
"Default Credentials",
"Check for default credentials on admin panels and services.",
"Try admin:admin, root:root, admin:password on all login forms.",
"critical", ["web", "api", "all"]),
CheckItem("A05", "Security Misconfiguration", "A05-02",
"Debug Mode in Production",
"Verify debug mode is disabled in production.",
"Trigger errors and check for stack traces, debug info, or verbose errors.",
"high", ["web", "api", "all"]),
CheckItem("A05", "Security Misconfiguration", "A05-03",
"Security Headers",
"Verify all security headers are present and properly configured.",
"Check for CSP, X-Frame-Options, X-Content-Type-Options, Referrer-Policy.",
"medium", ["web"]),
CheckItem("A05", "Security Misconfiguration", "A05-04",
"Unnecessary HTTP Methods",
"Verify only required HTTP methods are enabled.",
"Send OPTIONS request. Check for TRACE, DELETE on public endpoints.",
"low", ["web", "api"]),
# A06: Vulnerable Components
CheckItem("A06", "Vulnerable and Outdated Components", "A06-01",
"Dependency CVE Audit",
"Scan all dependencies for known CVEs.",
"Run npm audit, pip audit, govulncheck, or bundle audit.",
"high", ["web", "api", "mobile", "all"]),
CheckItem("A06", "Vulnerable and Outdated Components", "A06-02",
"End-of-Life Framework Check",
"Verify no EOL frameworks or languages are in use.",
"Check framework versions against vendor EOL dates.",
"medium", ["web", "api", "all"]),
# A07: Authentication Failures
CheckItem("A07", "Identification and Authentication Failures", "A07-01",
"Brute Force Protection",
"Verify brute force protection on authentication endpoints.",
"Send 100 rapid login attempts. Expect blocking after threshold.",
"high", ["web", "api", "all"]),
CheckItem("A07", "Identification and Authentication Failures", "A07-02",
"Session Management",
"Verify sessions are properly managed (HttpOnly, Secure, SameSite).",
"Check cookie flags: HttpOnly, Secure, SameSite=Strict|Lax.",
"high", ["web"]),
CheckItem("A07", "Identification and Authentication Failures", "A07-03",
"Session Invalidation on Logout",
"Verify sessions are invalidated on logout.",
"Logout, then replay the session cookie. Should receive 401.",
"high", ["web", "api"]),
CheckItem("A07", "Identification and Authentication Failures", "A07-04",
"Username Enumeration",
"Check for username enumeration via error messages.",
"Submit valid and invalid usernames. Error messages should be identical.",
"medium", ["web", "api"]),
# A08: Data Integrity
CheckItem("A08", "Software and Data Integrity Failures", "A08-01",
"Unsafe Deserialization",
"Check for unsafe deserialization of user input.",
"Review code for pickle.load(), yaml.load(), Java ObjectInputStream.",
"critical", ["web", "api"]),
CheckItem("A08", "Software and Data Integrity Failures", "A08-02",
"Subresource Integrity",
"Verify SRI hashes on CDN-loaded scripts and stylesheets.",
"Check <script> and <link> tags for integrity attributes.",
"medium", ["web"]),
# A09: Logging Failures
CheckItem("A09", "Security Logging and Monitoring Failures", "A09-01",
"Authentication Event Logging",
"Verify login success and failure events are logged.",
"Attempt valid and invalid logins. Check server logs for entries.",
"medium", ["web", "api", "all"]),
CheckItem("A09", "Security Logging and Monitoring Failures", "A09-02",
"Sensitive Data in Logs",
"Verify passwords, tokens, and PII are not logged.",
"Review log configuration and sample log output for sensitive data.",
"high", ["web", "api", "all"]),
# A10: SSRF
CheckItem("A10", "Server-Side Request Forgery", "A10-01",
"Internal Network Access via SSRF",
"Test URL input fields for SSRF vulnerabilities.",
"Submit http://169.254.169.254/ and http://127.0.0.1 in URL fields.",
"critical", ["web", "api"]),
CheckItem("A10", "Server-Side Request Forgery", "A10-02",
"DNS Rebinding",
"Test for DNS rebinding attacks on URL validators.",
"Use a DNS rebinding service to bypass allowlist validation.",
"high", ["web", "api"]),
]
return checks
def format_checklist_text(checks: List[CheckItem]) -> str:
"""Format checklist as human-readable text."""
lines = []
lines.append("=" * 70)
lines.append("OWASP TOP 10 SECURITY CHECKLIST")
lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"Total checks: {len(checks)}")
lines.append("=" * 70)
current_category = ""
for check in checks:
if check.owasp_category != current_category:
current_category = check.owasp_category
lines.append(f"\n--- {check.owasp_id}: {check.owasp_category} ---\n")
sev_marker = {"critical": "[!!!]", "high": "[!! ]", "medium": "[! ]", "low": "[. ]", "info": "[ ]"}
marker = sev_marker.get(check.severity, "[ ]")
lines.append(f" {marker} [{check.check_id}] {check.title}")
lines.append(f" {check.description}")
lines.append(f" Test: {check.test_procedure}")
lines.append(f" Severity: {check.severity.upper()}")
lines.append("")
return "\n".join(lines)
def format_findings_text(findings: List[SourceFinding]) -> str:
"""Format source findings as human-readable text."""
if not findings:
return "No vulnerability patterns detected in source code."
lines = []
lines.append(f"\nSOURCE CODE FINDINGS: {len(findings)} issue(s) found\n")
by_severity = {"critical": [], "high": [], "medium": [], "low": [], "info": []}
for f in findings:
by_severity.get(f.severity, by_severity["info"]).append(f)
for sev in ["critical", "high", "medium", "low", "info"]:
group = by_severity[sev]
if not group:
continue
lines.append(f" [{sev.upper()}] ({len(group)} finding(s))")
for f in group:
lines.append(f" - {f.title} [{f.rule_id}]")
lines.append(f" File: {f.file_path}:{f.line_number}")
lines.append(f" Code: {f.code_snippet}")
lines.append(f" Fix: {f.recommendation}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Vulnerability Scanner — Generate OWASP Top 10 checklists and scan source code for vulnerability patterns.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --target web --scope full
%(prog)s --target api --scope quick --json
%(prog)s --target web --source /path/to/code --scope full
%(prog)s --target mobile --scope quick --json
""",
)
parser.add_argument("--target", choices=["web", "api", "mobile"], default="web",
help="Target application type (default: web)")
parser.add_argument("--scope", choices=["quick", "full"], default="full",
help="Scan scope: quick (high/critical only) or full (default: full)")
parser.add_argument("--source", metavar="PATH",
help="Optional: path to source code directory to scan for patterns")
parser.add_argument("--json", action="store_true", dest="json_output",
help="Output results as JSON")
args = parser.parse_args()
scanner = VulnerabilityScanner(target=args.target, scope=args.scope)
checklist = scanner.generate_checklist()
source_findings = []
if args.source:
source_findings = scanner.scan_source(args.source)
if args.json_output:
output = {
"scan_metadata": {
"target": args.target,
"scope": args.scope,
"source_path": args.source,
"generated_at": datetime.now().isoformat(),
"checklist_count": len(checklist),
"source_findings_count": len(source_findings),
},
"checklist": [asdict(c) for c in checklist],
"source_findings": [asdict(f) for f in source_findings],
}
print(json.dumps(output, indent=2))
else:
print(format_checklist_text(checklist))
if source_findings:
print(format_findings_text(source_findings))
elif args.source:
print("\nNo vulnerability patterns detected in source code.")
# Exit with non-zero if critical/high findings found in source scan
critical_high = [f for f in source_findings if f.severity in ("critical", "high")]
if critical_high:
sys.exit(1)
if __name__ == "__main__":
main()