secrets-vault-manager (403-line SKILL.md, 3 scripts, 3 references): - HashiCorp Vault, AWS SM, Azure KV, GCP SM integration - Secret rotation, dynamic secrets, audit logging, emergency procedures sql-database-assistant (457-line SKILL.md, 3 scripts, 3 references): - Query optimization, migration generation, schema exploration - Multi-DB support (PostgreSQL, MySQL, SQLite, SQL Server) - ORM patterns (Prisma, Drizzle, TypeORM, SQLAlchemy) gcp-cloud-architect (418-line SKILL.md, 3 scripts, 3 references): - 6-step workflow mirroring aws-solution-architect for GCP - Cloud Run, GKE, BigQuery, Cloud Functions, cost optimization - Completes cloud trifecta (AWS + Azure + GCP) soc2-compliance (417-line SKILL.md, 3 scripts, 3 references): - SOC 2 Type I & II preparation, Trust Service Criteria mapping - Control matrix generation, evidence tracking, gap analysis - First SOC 2 skill in ra-qm-team (joins GDPR, ISO 27001, ISO 13485) All 12 scripts pass --help. Docs generated, mkdocs.yml nav updated. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
331 lines
12 KiB
Python
331 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Analyze Vault or cloud secret manager audit logs for anomalies.
|
|
|
|
Reads JSON-lines or JSON-array audit log files and flags unusual access
|
|
patterns including volume spikes, off-hours access, new source IPs,
|
|
and failed authentication attempts.
|
|
|
|
Usage:
|
|
python audit_log_analyzer.py --log-file vault-audit.log --threshold 5
|
|
python audit_log_analyzer.py --log-file audit.json --threshold 3 --json
|
|
|
|
Expected log entry format (JSON lines or JSON array):
|
|
{
|
|
"timestamp": "2026-03-20T14:32:00Z",
|
|
"type": "request",
|
|
"auth": {"accessor": "token-abc123", "entity_id": "eid-001", "display_name": "approle-payment-svc"},
|
|
"request": {"path": "secret/data/production/payment/api-keys", "operation": "read"},
|
|
"response": {"status_code": 200},
|
|
"remote_address": "10.0.1.15"
|
|
}
|
|
|
|
Fields are optional — the analyzer works with whatever is available.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import textwrap
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
|
|
def load_logs(path):
|
|
"""Load audit log entries from file. Supports JSON lines and JSON array."""
|
|
entries = []
|
|
try:
|
|
with open(path, "r") as f:
|
|
content = f.read().strip()
|
|
except FileNotFoundError:
|
|
print(f"ERROR: Log file not found: {path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if not content:
|
|
return entries
|
|
|
|
# Try JSON array first
|
|
if content.startswith("["):
|
|
try:
|
|
entries = json.loads(content)
|
|
return entries
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Try JSON lines
|
|
for i, line in enumerate(content.split("\n"), 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
entries.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
print(f"WARNING: Skipping malformed line {i}", file=sys.stderr)
|
|
|
|
return entries
|
|
|
|
|
|
def extract_fields(entry):
|
|
"""Extract normalized fields from a log entry."""
|
|
timestamp_raw = entry.get("timestamp", entry.get("time", ""))
|
|
ts = None
|
|
if timestamp_raw:
|
|
for fmt in ("%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%d %H:%M:%S"):
|
|
try:
|
|
ts = datetime.strptime(timestamp_raw.replace("+00:00", "Z").rstrip("Z") + "Z", fmt.rstrip("Z") + "Z") if "Z" not in fmt else datetime.strptime(timestamp_raw, fmt)
|
|
break
|
|
except (ValueError, TypeError):
|
|
continue
|
|
if ts is None:
|
|
# Fallback: try basic parse
|
|
try:
|
|
ts = datetime.fromisoformat(timestamp_raw.replace("Z", "+00:00").replace("+00:00", ""))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
auth = entry.get("auth", {})
|
|
request = entry.get("request", {})
|
|
response = entry.get("response", {})
|
|
|
|
return {
|
|
"timestamp": ts,
|
|
"hour": ts.hour if ts else None,
|
|
"identity": auth.get("display_name", auth.get("entity_id", "unknown")),
|
|
"path": request.get("path", entry.get("path", "unknown")),
|
|
"operation": request.get("operation", entry.get("operation", "unknown")),
|
|
"status_code": response.get("status_code", entry.get("status_code")),
|
|
"remote_address": entry.get("remote_address", entry.get("source_address", "unknown")),
|
|
"entry_type": entry.get("type", "unknown"),
|
|
}
|
|
|
|
|
|
def analyze(entries, threshold):
|
|
"""Run anomaly detection across all log entries."""
|
|
parsed = [extract_fields(e) for e in entries]
|
|
|
|
# Counters
|
|
access_by_identity = defaultdict(int)
|
|
access_by_path = defaultdict(int)
|
|
access_by_ip = defaultdict(set) # identity -> set of IPs
|
|
ip_to_identities = defaultdict(set) # IP -> set of identities
|
|
failed_by_source = defaultdict(int)
|
|
off_hours_access = []
|
|
path_by_identity = defaultdict(set) # identity -> set of paths
|
|
hourly_distribution = defaultdict(int)
|
|
|
|
for p in parsed:
|
|
identity = p["identity"]
|
|
path = p["path"]
|
|
ip = p["remote_address"]
|
|
status = p["status_code"]
|
|
hour = p["hour"]
|
|
|
|
access_by_identity[identity] += 1
|
|
access_by_path[path] += 1
|
|
access_by_ip[identity].add(ip)
|
|
ip_to_identities[ip].add(identity)
|
|
path_by_identity[identity].add(path)
|
|
|
|
if hour is not None:
|
|
hourly_distribution[hour] += 1
|
|
|
|
# Failed access (non-200 or 4xx/5xx)
|
|
if status and (status >= 400 or status == 0):
|
|
failed_by_source[f"{identity}@{ip}"] += 1
|
|
|
|
# Off-hours: before 6 AM or after 10 PM
|
|
if hour is not None and (hour < 6 or hour >= 22):
|
|
off_hours_access.append(p)
|
|
|
|
# Build anomalies
|
|
anomalies = []
|
|
|
|
# 1. Volume spikes — identities accessing secrets more than threshold * average
|
|
if access_by_identity:
|
|
avg_access = sum(access_by_identity.values()) / len(access_by_identity)
|
|
spike_threshold = max(threshold * avg_access, threshold)
|
|
for identity, count in access_by_identity.items():
|
|
if count >= spike_threshold:
|
|
anomalies.append({
|
|
"type": "volume_spike",
|
|
"severity": "HIGH",
|
|
"identity": identity,
|
|
"access_count": count,
|
|
"threshold": round(spike_threshold, 1),
|
|
"description": f"Identity '{identity}' made {count} accesses (threshold: {round(spike_threshold, 1)})",
|
|
})
|
|
|
|
# 2. Multi-IP access — single identity from many IPs
|
|
for identity, ips in access_by_ip.items():
|
|
if len(ips) >= threshold:
|
|
anomalies.append({
|
|
"type": "multi_ip_access",
|
|
"severity": "MEDIUM",
|
|
"identity": identity,
|
|
"ip_count": len(ips),
|
|
"ips": sorted(ips),
|
|
"description": f"Identity '{identity}' accessed from {len(ips)} different IPs",
|
|
})
|
|
|
|
# 3. Failed access attempts
|
|
for source, count in failed_by_source.items():
|
|
if count >= threshold:
|
|
anomalies.append({
|
|
"type": "failed_access",
|
|
"severity": "HIGH",
|
|
"source": source,
|
|
"failure_count": count,
|
|
"description": f"Source '{source}' had {count} failed access attempts",
|
|
})
|
|
|
|
# 4. Off-hours access
|
|
if off_hours_access:
|
|
off_hours_identities = defaultdict(int)
|
|
for p in off_hours_access:
|
|
off_hours_identities[p["identity"]] += 1
|
|
|
|
for identity, count in off_hours_identities.items():
|
|
if count >= max(threshold, 2):
|
|
anomalies.append({
|
|
"type": "off_hours_access",
|
|
"severity": "MEDIUM",
|
|
"identity": identity,
|
|
"access_count": count,
|
|
"description": f"Identity '{identity}' made {count} accesses outside business hours (before 6 AM / after 10 PM)",
|
|
})
|
|
|
|
# 5. Broad path access — single identity touching many paths
|
|
for identity, paths in path_by_identity.items():
|
|
if len(paths) >= threshold * 2:
|
|
anomalies.append({
|
|
"type": "broad_access",
|
|
"severity": "MEDIUM",
|
|
"identity": identity,
|
|
"path_count": len(paths),
|
|
"paths": sorted(paths)[:10],
|
|
"description": f"Identity '{identity}' accessed {len(paths)} distinct secret paths",
|
|
})
|
|
|
|
# Sort anomalies by severity
|
|
severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3}
|
|
anomalies.sort(key=lambda x: severity_order.get(x["severity"], 4))
|
|
|
|
# Summary stats
|
|
summary = {
|
|
"total_entries": len(entries),
|
|
"parsed_entries": len(parsed),
|
|
"unique_identities": len(access_by_identity),
|
|
"unique_paths": len(access_by_path),
|
|
"unique_source_ips": len(ip_to_identities),
|
|
"total_failures": sum(failed_by_source.values()),
|
|
"off_hours_events": len(off_hours_access),
|
|
"anomalies_found": len(anomalies),
|
|
}
|
|
|
|
# Top accessed paths
|
|
top_paths = sorted(access_by_path.items(), key=lambda x: -x[1])[:10]
|
|
|
|
return {
|
|
"summary": summary,
|
|
"anomalies": anomalies,
|
|
"top_accessed_paths": [{"path": p, "count": c} for p, c in top_paths],
|
|
"hourly_distribution": dict(sorted(hourly_distribution.items())),
|
|
}
|
|
|
|
|
|
def print_human(result, threshold):
|
|
"""Print human-readable analysis report."""
|
|
summary = result["summary"]
|
|
anomalies = result["anomalies"]
|
|
|
|
print("=== Audit Log Analysis Report ===")
|
|
print(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
|
|
print(f"Anomaly threshold: {threshold}")
|
|
print()
|
|
|
|
print("--- Summary ---")
|
|
print(f" Total log entries: {summary['total_entries']}")
|
|
print(f" Unique identities: {summary['unique_identities']}")
|
|
print(f" Unique secret paths: {summary['unique_paths']}")
|
|
print(f" Unique source IPs: {summary['unique_source_ips']}")
|
|
print(f" Total failures: {summary['total_failures']}")
|
|
print(f" Off-hours events: {summary['off_hours_events']}")
|
|
print(f" Anomalies detected: {summary['anomalies_found']}")
|
|
print()
|
|
|
|
if anomalies:
|
|
print("--- Anomalies ---")
|
|
for i, a in enumerate(anomalies, 1):
|
|
print(f" [{a['severity']}] {a['type']}: {a['description']}")
|
|
print()
|
|
else:
|
|
print("--- No anomalies detected ---")
|
|
print()
|
|
|
|
if result["top_accessed_paths"]:
|
|
print("--- Top Accessed Paths ---")
|
|
for item in result["top_accessed_paths"]:
|
|
print(f" {item['count']:5d} {item['path']}")
|
|
print()
|
|
|
|
if result["hourly_distribution"]:
|
|
print("--- Hourly Distribution ---")
|
|
max_count = max(result["hourly_distribution"].values()) if result["hourly_distribution"] else 1
|
|
for hour in range(24):
|
|
count = result["hourly_distribution"].get(hour, 0)
|
|
bar_len = int((count / max_count) * 40) if max_count > 0 else 0
|
|
marker = " *" if (hour < 6 or hour >= 22) else ""
|
|
print(f" {hour:02d}:00 {'#' * bar_len:40s} {count}{marker}")
|
|
print(" (* = off-hours)")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyze Vault/cloud secret manager audit logs for anomalies.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=textwrap.dedent("""\
|
|
The analyzer detects:
|
|
- Volume spikes (identity accessing secrets above threshold * average)
|
|
- Multi-IP access (single identity from many source IPs)
|
|
- Failed access attempts (repeated auth/access failures)
|
|
- Off-hours access (before 6 AM or after 10 PM)
|
|
- Broad path access (single identity accessing many distinct paths)
|
|
|
|
Log format: JSON lines or JSON array. Each entry should include
|
|
timestamp, auth info, request path/operation, response status,
|
|
and remote address. Missing fields are handled gracefully.
|
|
|
|
Examples:
|
|
%(prog)s --log-file vault-audit.log --threshold 5
|
|
%(prog)s --log-file audit.json --threshold 3 --json
|
|
"""),
|
|
)
|
|
parser.add_argument("--log-file", required=True, help="Path to audit log file (JSON lines or JSON array)")
|
|
parser.add_argument(
|
|
"--threshold",
|
|
type=int,
|
|
default=5,
|
|
help="Anomaly sensitivity threshold — lower = more sensitive (default: 5)",
|
|
)
|
|
parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON")
|
|
|
|
args = parser.parse_args()
|
|
|
|
entries = load_logs(args.log_file)
|
|
if not entries:
|
|
print("No log entries found in file.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
result = analyze(entries, args.threshold)
|
|
result["log_file"] = args.log_file
|
|
result["threshold"] = args.threshold
|
|
result["analyzed_at"] = datetime.now().isoformat()
|
|
|
|
if args.json_output:
|
|
print(json.dumps(result, indent=2))
|
|
else:
|
|
print_human(result, args.threshold)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|