Files
claude-skills-reference/engineering/secrets-vault-manager/scripts/audit_log_analyzer.py
Reza Rezvani 87f3a007c9 feat(engineering,ra-qm): add secrets-vault-manager, sql-database-assistant, gcp-cloud-architect, soc2-compliance
secrets-vault-manager (403-line SKILL.md, 3 scripts, 3 references):
- HashiCorp Vault, AWS SM, Azure KV, GCP SM integration
- Secret rotation, dynamic secrets, audit logging, emergency procedures

sql-database-assistant (457-line SKILL.md, 3 scripts, 3 references):
- Query optimization, migration generation, schema exploration
- Multi-DB support (PostgreSQL, MySQL, SQLite, SQL Server)
- ORM patterns (Prisma, Drizzle, TypeORM, SQLAlchemy)

gcp-cloud-architect (418-line SKILL.md, 3 scripts, 3 references):
- 6-step workflow mirroring aws-solution-architect for GCP
- Cloud Run, GKE, BigQuery, Cloud Functions, cost optimization
- Completes cloud trifecta (AWS + Azure + GCP)

soc2-compliance (417-line SKILL.md, 3 scripts, 3 references):
- SOC 2 Type I & II preparation, Trust Service Criteria mapping
- Control matrix generation, evidence tracking, gap analysis
- First SOC 2 skill in ra-qm-team (joins GDPR, ISO 27001, ISO 13485)

All 12 scripts pass --help. Docs generated, mkdocs.yml nav updated.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 14:05:11 +01:00

331 lines
12 KiB
Python

#!/usr/bin/env python3
"""Analyze Vault or cloud secret manager audit logs for anomalies.
Reads JSON-lines or JSON-array audit log files and flags unusual access
patterns including volume spikes, off-hours access, new source IPs,
and failed authentication attempts.
Usage:
python audit_log_analyzer.py --log-file vault-audit.log --threshold 5
python audit_log_analyzer.py --log-file audit.json --threshold 3 --json
Expected log entry format (JSON lines or JSON array):
{
"timestamp": "2026-03-20T14:32:00Z",
"type": "request",
"auth": {"accessor": "token-abc123", "entity_id": "eid-001", "display_name": "approle-payment-svc"},
"request": {"path": "secret/data/production/payment/api-keys", "operation": "read"},
"response": {"status_code": 200},
"remote_address": "10.0.1.15"
}
Fields are optional — the analyzer works with whatever is available.
"""
import argparse
import json
import sys
import textwrap
from collections import defaultdict
from datetime import datetime
def load_logs(path):
"""Load audit log entries from file. Supports JSON lines and JSON array."""
entries = []
try:
with open(path, "r") as f:
content = f.read().strip()
except FileNotFoundError:
print(f"ERROR: Log file not found: {path}", file=sys.stderr)
sys.exit(1)
if not content:
return entries
# Try JSON array first
if content.startswith("["):
try:
entries = json.loads(content)
return entries
except json.JSONDecodeError:
pass
# Try JSON lines
for i, line in enumerate(content.split("\n"), 1):
line = line.strip()
if not line:
continue
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
print(f"WARNING: Skipping malformed line {i}", file=sys.stderr)
return entries
def extract_fields(entry):
"""Extract normalized fields from a log entry."""
timestamp_raw = entry.get("timestamp", entry.get("time", ""))
ts = None
if timestamp_raw:
for fmt in ("%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%d %H:%M:%S"):
try:
ts = datetime.strptime(timestamp_raw.replace("+00:00", "Z").rstrip("Z") + "Z", fmt.rstrip("Z") + "Z") if "Z" not in fmt else datetime.strptime(timestamp_raw, fmt)
break
except (ValueError, TypeError):
continue
if ts is None:
# Fallback: try basic parse
try:
ts = datetime.fromisoformat(timestamp_raw.replace("Z", "+00:00").replace("+00:00", ""))
except (ValueError, TypeError):
pass
auth = entry.get("auth", {})
request = entry.get("request", {})
response = entry.get("response", {})
return {
"timestamp": ts,
"hour": ts.hour if ts else None,
"identity": auth.get("display_name", auth.get("entity_id", "unknown")),
"path": request.get("path", entry.get("path", "unknown")),
"operation": request.get("operation", entry.get("operation", "unknown")),
"status_code": response.get("status_code", entry.get("status_code")),
"remote_address": entry.get("remote_address", entry.get("source_address", "unknown")),
"entry_type": entry.get("type", "unknown"),
}
def analyze(entries, threshold):
"""Run anomaly detection across all log entries."""
parsed = [extract_fields(e) for e in entries]
# Counters
access_by_identity = defaultdict(int)
access_by_path = defaultdict(int)
access_by_ip = defaultdict(set) # identity -> set of IPs
ip_to_identities = defaultdict(set) # IP -> set of identities
failed_by_source = defaultdict(int)
off_hours_access = []
path_by_identity = defaultdict(set) # identity -> set of paths
hourly_distribution = defaultdict(int)
for p in parsed:
identity = p["identity"]
path = p["path"]
ip = p["remote_address"]
status = p["status_code"]
hour = p["hour"]
access_by_identity[identity] += 1
access_by_path[path] += 1
access_by_ip[identity].add(ip)
ip_to_identities[ip].add(identity)
path_by_identity[identity].add(path)
if hour is not None:
hourly_distribution[hour] += 1
# Failed access (non-200 or 4xx/5xx)
if status and (status >= 400 or status == 0):
failed_by_source[f"{identity}@{ip}"] += 1
# Off-hours: before 6 AM or after 10 PM
if hour is not None and (hour < 6 or hour >= 22):
off_hours_access.append(p)
# Build anomalies
anomalies = []
# 1. Volume spikes — identities accessing secrets more than threshold * average
if access_by_identity:
avg_access = sum(access_by_identity.values()) / len(access_by_identity)
spike_threshold = max(threshold * avg_access, threshold)
for identity, count in access_by_identity.items():
if count >= spike_threshold:
anomalies.append({
"type": "volume_spike",
"severity": "HIGH",
"identity": identity,
"access_count": count,
"threshold": round(spike_threshold, 1),
"description": f"Identity '{identity}' made {count} accesses (threshold: {round(spike_threshold, 1)})",
})
# 2. Multi-IP access — single identity from many IPs
for identity, ips in access_by_ip.items():
if len(ips) >= threshold:
anomalies.append({
"type": "multi_ip_access",
"severity": "MEDIUM",
"identity": identity,
"ip_count": len(ips),
"ips": sorted(ips),
"description": f"Identity '{identity}' accessed from {len(ips)} different IPs",
})
# 3. Failed access attempts
for source, count in failed_by_source.items():
if count >= threshold:
anomalies.append({
"type": "failed_access",
"severity": "HIGH",
"source": source,
"failure_count": count,
"description": f"Source '{source}' had {count} failed access attempts",
})
# 4. Off-hours access
if off_hours_access:
off_hours_identities = defaultdict(int)
for p in off_hours_access:
off_hours_identities[p["identity"]] += 1
for identity, count in off_hours_identities.items():
if count >= max(threshold, 2):
anomalies.append({
"type": "off_hours_access",
"severity": "MEDIUM",
"identity": identity,
"access_count": count,
"description": f"Identity '{identity}' made {count} accesses outside business hours (before 6 AM / after 10 PM)",
})
# 5. Broad path access — single identity touching many paths
for identity, paths in path_by_identity.items():
if len(paths) >= threshold * 2:
anomalies.append({
"type": "broad_access",
"severity": "MEDIUM",
"identity": identity,
"path_count": len(paths),
"paths": sorted(paths)[:10],
"description": f"Identity '{identity}' accessed {len(paths)} distinct secret paths",
})
# Sort anomalies by severity
severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3}
anomalies.sort(key=lambda x: severity_order.get(x["severity"], 4))
# Summary stats
summary = {
"total_entries": len(entries),
"parsed_entries": len(parsed),
"unique_identities": len(access_by_identity),
"unique_paths": len(access_by_path),
"unique_source_ips": len(ip_to_identities),
"total_failures": sum(failed_by_source.values()),
"off_hours_events": len(off_hours_access),
"anomalies_found": len(anomalies),
}
# Top accessed paths
top_paths = sorted(access_by_path.items(), key=lambda x: -x[1])[:10]
return {
"summary": summary,
"anomalies": anomalies,
"top_accessed_paths": [{"path": p, "count": c} for p, c in top_paths],
"hourly_distribution": dict(sorted(hourly_distribution.items())),
}
def print_human(result, threshold):
"""Print human-readable analysis report."""
summary = result["summary"]
anomalies = result["anomalies"]
print("=== Audit Log Analysis Report ===")
print(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"Anomaly threshold: {threshold}")
print()
print("--- Summary ---")
print(f" Total log entries: {summary['total_entries']}")
print(f" Unique identities: {summary['unique_identities']}")
print(f" Unique secret paths: {summary['unique_paths']}")
print(f" Unique source IPs: {summary['unique_source_ips']}")
print(f" Total failures: {summary['total_failures']}")
print(f" Off-hours events: {summary['off_hours_events']}")
print(f" Anomalies detected: {summary['anomalies_found']}")
print()
if anomalies:
print("--- Anomalies ---")
for i, a in enumerate(anomalies, 1):
print(f" [{a['severity']}] {a['type']}: {a['description']}")
print()
else:
print("--- No anomalies detected ---")
print()
if result["top_accessed_paths"]:
print("--- Top Accessed Paths ---")
for item in result["top_accessed_paths"]:
print(f" {item['count']:5d} {item['path']}")
print()
if result["hourly_distribution"]:
print("--- Hourly Distribution ---")
max_count = max(result["hourly_distribution"].values()) if result["hourly_distribution"] else 1
for hour in range(24):
count = result["hourly_distribution"].get(hour, 0)
bar_len = int((count / max_count) * 40) if max_count > 0 else 0
marker = " *" if (hour < 6 or hour >= 22) else ""
print(f" {hour:02d}:00 {'#' * bar_len:40s} {count}{marker}")
print(" (* = off-hours)")
def main():
parser = argparse.ArgumentParser(
description="Analyze Vault/cloud secret manager audit logs for anomalies.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent("""\
The analyzer detects:
- Volume spikes (identity accessing secrets above threshold * average)
- Multi-IP access (single identity from many source IPs)
- Failed access attempts (repeated auth/access failures)
- Off-hours access (before 6 AM or after 10 PM)
- Broad path access (single identity accessing many distinct paths)
Log format: JSON lines or JSON array. Each entry should include
timestamp, auth info, request path/operation, response status,
and remote address. Missing fields are handled gracefully.
Examples:
%(prog)s --log-file vault-audit.log --threshold 5
%(prog)s --log-file audit.json --threshold 3 --json
"""),
)
parser.add_argument("--log-file", required=True, help="Path to audit log file (JSON lines or JSON array)")
parser.add_argument(
"--threshold",
type=int,
default=5,
help="Anomaly sensitivity threshold — lower = more sensitive (default: 5)",
)
parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON")
args = parser.parse_args()
entries = load_logs(args.log_file)
if not entries:
print("No log entries found in file.", file=sys.stderr)
sys.exit(1)
result = analyze(entries, args.threshold)
result["log_file"] = args.log_file
result["threshold"] = args.threshold
result["analyzed_at"] = datetime.now().isoformat()
if args.json_output:
print(json.dumps(result, indent=2))
else:
print_human(result, args.threshold)
if __name__ == "__main__":
main()