diff --git a/engineering-team/senior-secops/SKILL.md b/engineering-team/senior-secops/SKILL.md index 25527d3..ff0c192 100644 --- a/engineering-team/senior-secops/SKILL.md +++ b/engineering-team/senior-secops/SKILL.md @@ -3,207 +3,503 @@ name: senior-secops description: Comprehensive SecOps skill for application security, vulnerability management, compliance, and secure development practices. Includes security scanning, vulnerability assessment, compliance checking, and security automation. Use when implementing security controls, conducting security audits, responding to vulnerabilities, or ensuring compliance requirements. --- -# Senior Secops +# Senior SecOps Engineer -Complete toolkit for senior secops with modern tools and best practices. +Complete toolkit for Security Operations including vulnerability management, compliance verification, secure coding practices, and security automation. -## Quick Start +--- -### Main Capabilities +## Table of Contents -This skill provides three core capabilities through automated scripts: +- [Trigger Terms](#trigger-terms) +- [Core Capabilities](#core-capabilities) +- [Workflows](#workflows) +- [Tool Reference](#tool-reference) +- [Security Standards](#security-standards) +- [Compliance Frameworks](#compliance-frameworks) +- [Best Practices](#best-practices) -```bash -# Script 1: Security Scanner -python scripts/security_scanner.py [options] +--- -# Script 2: Vulnerability Assessor -python scripts/vulnerability_assessor.py [options] +## Trigger Terms -# Script 3: Compliance Checker -python scripts/compliance_checker.py [options] -``` +Use this skill when you encounter: + +| Category | Terms | +|----------|-------| +| **Vulnerability Management** | CVE, CVSS, vulnerability scan, security patch, dependency audit, npm audit, pip-audit | +| **OWASP Top 10** | injection, XSS, CSRF, broken authentication, security misconfiguration, sensitive data exposure | +| **Compliance** | SOC 2, PCI-DSS, HIPAA, GDPR, compliance audit, security controls, access control | +| **Secure Coding** | input validation, output encoding, parameterized queries, prepared statements, sanitization | +| **Secrets Management** | API key, secrets vault, environment variables, HashiCorp Vault, AWS Secrets Manager | +| **Authentication** | JWT, OAuth, MFA, 2FA, TOTP, password hashing, bcrypt, argon2, session management | +| **Security Testing** | SAST, DAST, penetration test, security scan, Snyk, Semgrep, CodeQL, Trivy | +| **Incident Response** | security incident, breach notification, incident response, forensics, containment | +| **Network Security** | TLS, HTTPS, HSTS, CSP, CORS, security headers, firewall rules, WAF | +| **Infrastructure Security** | container security, Kubernetes security, IAM, least privilege, zero trust | +| **Cryptography** | encryption at rest, encryption in transit, AES-256, RSA, key management, KMS | +| **Monitoring** | security monitoring, SIEM, audit logging, intrusion detection, anomaly detection | + +--- ## Core Capabilities ### 1. Security Scanner -Automated tool for security scanner tasks. +Scan source code for security vulnerabilities including hardcoded secrets, SQL injection, XSS, command injection, and path traversal. -**Features:** -- Automated scaffolding -- Best practices built-in -- Configurable templates -- Quality checks - -**Usage:** ```bash -python scripts/security_scanner.py [options] +# Scan project for security issues +python scripts/security_scanner.py /path/to/project + +# Filter by severity +python scripts/security_scanner.py /path/to/project --severity high + +# JSON output for CI/CD +python scripts/security_scanner.py /path/to/project --json --output report.json ``` +**Detects:** +- Hardcoded secrets (API keys, passwords, AWS credentials, GitHub tokens, private keys) +- SQL injection patterns (string concatenation, f-strings, template literals) +- XSS vulnerabilities (innerHTML assignment, unsafe DOM manipulation, React unsafe patterns) +- Command injection (shell=True, exec, eval with user input) +- Path traversal (file operations with user input) + ### 2. Vulnerability Assessor -Comprehensive analysis and optimization tool. +Scan dependencies for known CVEs across npm, Python, and Go ecosystems. -**Features:** -- Deep analysis -- Performance metrics -- Recommendations -- Automated fixes - -**Usage:** ```bash -python scripts/vulnerability_assessor.py [--verbose] +# Assess project dependencies +python scripts/vulnerability_assessor.py /path/to/project + +# Critical/high only +python scripts/vulnerability_assessor.py /path/to/project --severity high + +# Export vulnerability report +python scripts/vulnerability_assessor.py /path/to/project --json --output vulns.json ``` +**Scans:** +- `package.json` and `package-lock.json` (npm) +- `requirements.txt` and `pyproject.toml` (Python) +- `go.mod` (Go) + +**Output:** +- CVE IDs with CVSS scores +- Affected package versions +- Fixed versions for remediation +- Overall risk score (0-100) + ### 3. Compliance Checker -Advanced tooling for specialized tasks. +Verify security compliance against SOC 2, PCI-DSS, HIPAA, and GDPR frameworks. -**Features:** -- Expert-level automation -- Custom configurations -- Integration ready -- Production-grade output - -**Usage:** ```bash -python scripts/compliance_checker.py [arguments] [options] +# Check all frameworks +python scripts/compliance_checker.py /path/to/project + +# Specific framework +python scripts/compliance_checker.py /path/to/project --framework soc2 +python scripts/compliance_checker.py /path/to/project --framework pci-dss +python scripts/compliance_checker.py /path/to/project --framework hipaa +python scripts/compliance_checker.py /path/to/project --framework gdpr + +# Export compliance report +python scripts/compliance_checker.py /path/to/project --json --output compliance.json ``` +**Verifies:** +- Access control implementation +- Encryption at rest and in transit +- Audit logging +- Authentication strength (MFA, password hashing) +- Security documentation +- CI/CD security controls + +--- + +## Workflows + +### Workflow 1: Security Audit + +Complete security assessment of a codebase. + +```bash +# Step 1: Scan for code vulnerabilities +python scripts/security_scanner.py . --severity medium + +# Step 2: Check dependency vulnerabilities +python scripts/vulnerability_assessor.py . --severity high + +# Step 3: Verify compliance controls +python scripts/compliance_checker.py . --framework all + +# Step 4: Generate combined report +python scripts/security_scanner.py . --json --output security.json +python scripts/vulnerability_assessor.py . --json --output vulns.json +python scripts/compliance_checker.py . --json --output compliance.json +``` + +### Workflow 2: CI/CD Security Gate + +Integrate security checks into deployment pipeline. + +```yaml +# .github/workflows/security.yml +name: Security Scan + +on: + pull_request: + branches: [main, develop] + +jobs: + security-scan: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Security Scanner + run: python scripts/security_scanner.py . --severity high + + - name: Vulnerability Assessment + run: python scripts/vulnerability_assessor.py . --severity critical + + - name: Compliance Check + run: python scripts/compliance_checker.py . --framework soc2 +``` + +### Workflow 3: CVE Triage + +Respond to a new CVE affecting your application. + +``` +1. ASSESS (0-2 hours) + - Identify affected systems using vulnerability_assessor.py + - Check if CVE is being actively exploited + - Determine CVSS environmental score for your context + +2. PRIORITIZE + - Critical (CVSS 9.0+, internet-facing): 24 hours + - High (CVSS 7.0-8.9): 7 days + - Medium (CVSS 4.0-6.9): 30 days + - Low (CVSS < 4.0): 90 days + +3. REMEDIATE + - Update affected dependency to fixed version + - Run security_scanner.py to verify fix + - Test for regressions + - Deploy with enhanced monitoring + +4. VERIFY + - Re-run vulnerability_assessor.py + - Confirm CVE no longer reported + - Document remediation actions +``` + +### Workflow 4: Incident Response + +Security incident handling procedure. + +``` +PHASE 1: DETECT & IDENTIFY (0-15 min) +- Alert received and acknowledged +- Initial severity assessment (SEV-1 to SEV-4) +- Incident commander assigned +- Communication channel established + +PHASE 2: CONTAIN (15-60 min) +- Affected systems identified +- Network isolation if needed +- Credentials rotated if compromised +- Preserve evidence (logs, memory dumps) + +PHASE 3: ERADICATE (1-4 hours) +- Root cause identified +- Malware/backdoors removed +- Vulnerabilities patched (run security_scanner.py) +- Systems hardened + +PHASE 4: RECOVER (4-24 hours) +- Systems restored from clean backup +- Services brought back online +- Enhanced monitoring enabled +- User access restored + +PHASE 5: POST-INCIDENT (24-72 hours) +- Incident timeline documented +- Root cause analysis complete +- Lessons learned documented +- Preventive measures implemented +- Stakeholder report delivered +``` + +--- + +## Tool Reference + +### security_scanner.py + +| Option | Description | +|--------|-------------| +| `target` | Directory or file to scan | +| `--severity, -s` | Minimum severity: critical, high, medium, low | +| `--verbose, -v` | Show files as they're scanned | +| `--json` | Output results as JSON | +| `--output, -o` | Write results to file | + +**Exit Codes:** +- `0`: No critical/high findings +- `1`: High severity findings +- `2`: Critical severity findings + +### vulnerability_assessor.py + +| Option | Description | +|--------|-------------| +| `target` | Directory containing dependency files | +| `--severity, -s` | Minimum severity: critical, high, medium, low | +| `--verbose, -v` | Show files as they're scanned | +| `--json` | Output results as JSON | +| `--output, -o` | Write results to file | + +**Exit Codes:** +- `0`: No critical/high vulnerabilities +- `1`: High severity vulnerabilities +- `2`: Critical severity vulnerabilities + +### compliance_checker.py + +| Option | Description | +|--------|-------------| +| `target` | Directory to check | +| `--framework, -f` | Framework: soc2, pci-dss, hipaa, gdpr, all | +| `--verbose, -v` | Show checks as they run | +| `--json` | Output results as JSON | +| `--output, -o` | Write results to file | + +**Exit Codes:** +- `0`: Compliant (90%+ score) +- `1`: Non-compliant (50-69% score) +- `2`: Critical gaps (<50% score) + +--- + +## Security Standards + +### OWASP Top 10 Prevention + +| Vulnerability | Prevention | +|--------------|------------| +| **A01: Broken Access Control** | Implement RBAC, deny by default, validate permissions server-side | +| **A02: Cryptographic Failures** | Use TLS 1.2+, AES-256 encryption, secure key management | +| **A03: Injection** | Parameterized queries, input validation, escape output | +| **A04: Insecure Design** | Threat modeling, secure design patterns, defense in depth | +| **A05: Security Misconfiguration** | Hardening guides, remove defaults, disable unused features | +| **A06: Vulnerable Components** | Dependency scanning, automated updates, SBOM | +| **A07: Authentication Failures** | MFA, rate limiting, secure password storage | +| **A08: Data Integrity Failures** | Code signing, integrity checks, secure CI/CD | +| **A09: Security Logging Failures** | Comprehensive audit logs, SIEM integration, alerting | +| **A10: SSRF** | URL validation, allowlist destinations, network segmentation | + +### Secure Coding Checklist + +```markdown +## Input Validation +- [ ] Validate all input on server side +- [ ] Use allowlists over denylists +- [ ] Sanitize for specific context (HTML, SQL, shell) + +## Output Encoding +- [ ] HTML encode for browser output +- [ ] URL encode for URLs +- [ ] JavaScript encode for script contexts + +## Authentication +- [ ] Use bcrypt/argon2 for passwords +- [ ] Implement MFA for sensitive operations +- [ ] Enforce strong password policy + +## Session Management +- [ ] Generate secure random session IDs +- [ ] Set HttpOnly, Secure, SameSite flags +- [ ] Implement session timeout (15 min idle) + +## Error Handling +- [ ] Log errors with context (no secrets) +- [ ] Return generic messages to users +- [ ] Never expose stack traces in production + +## Secrets Management +- [ ] Use environment variables or secrets manager +- [ ] Never commit secrets to version control +- [ ] Rotate credentials regularly +``` + +--- + +## Compliance Frameworks + +### SOC 2 Type II Controls + +| Control | Category | Description | +|---------|----------|-------------| +| CC1 | Control Environment | Security policies, org structure | +| CC2 | Communication | Security awareness, documentation | +| CC3 | Risk Assessment | Vulnerability scanning, threat modeling | +| CC6 | Logical Access | Authentication, authorization, MFA | +| CC7 | System Operations | Monitoring, logging, incident response | +| CC8 | Change Management | CI/CD, code review, deployment controls | + +### PCI-DSS v4.0 Requirements + +| Requirement | Description | +|-------------|-------------| +| Req 3 | Protect stored cardholder data (encryption at rest) | +| Req 4 | Encrypt transmission (TLS 1.2+) | +| Req 6 | Secure development (input validation, secure coding) | +| Req 8 | Strong authentication (MFA, password policy) | +| Req 10 | Audit logging (all access to cardholder data) | +| Req 11 | Security testing (SAST, DAST, penetration testing) | + +### HIPAA Security Rule + +| Safeguard | Requirement | +|-----------|-------------| +| 164.312(a)(1) | Unique user identification for PHI access | +| 164.312(b) | Audit trails for PHI access | +| 164.312(c)(1) | Data integrity controls | +| 164.312(d) | Person/entity authentication (MFA) | +| 164.312(e)(1) | Transmission encryption (TLS) | + +### GDPR Requirements + +| Article | Requirement | +|---------|-------------| +| Art 25 | Privacy by design, data minimization | +| Art 32 | Security measures, encryption, pseudonymization | +| Art 33 | Breach notification (72 hours) | +| Art 17 | Right to erasure (data deletion) | +| Art 20 | Data portability (export capability) | + +--- + +## Best Practices + +### Secrets Management + +```python +# BAD: Hardcoded secret +API_KEY = "sk-1234567890abcdef" + +# GOOD: Environment variable +import os +API_KEY = os.environ.get("API_KEY") + +# BETTER: Secrets manager +from your_vault_client import get_secret +API_KEY = get_secret("api/key") +``` + +### SQL Injection Prevention + +```python +# BAD: String concatenation +query = f"SELECT * FROM users WHERE id = {user_id}" + +# GOOD: Parameterized query +cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) +``` + +### XSS Prevention + +```javascript +// BAD: Direct innerHTML assignment is vulnerable +// GOOD: Use textContent (auto-escaped) +element.textContent = userInput; + +// GOOD: Use sanitization library for HTML +import DOMPurify from 'dompurify'; +const safeHTML = DOMPurify.sanitize(userInput); +``` + +### Authentication + +```javascript +// Password hashing +const bcrypt = require('bcrypt'); +const SALT_ROUNDS = 12; + +// Hash password +const hash = await bcrypt.hash(password, SALT_ROUNDS); + +// Verify password +const match = await bcrypt.compare(password, hash); +``` + +### Security Headers + +```javascript +// Express.js security headers +const helmet = require('helmet'); +app.use(helmet()); + +// Or manually set headers: +app.use((req, res, next) => { + res.setHeader('X-Content-Type-Options', 'nosniff'); + res.setHeader('X-Frame-Options', 'DENY'); + res.setHeader('X-XSS-Protection', '1; mode=block'); + res.setHeader('Strict-Transport-Security', 'max-age=31536000; includeSubDomains'); + res.setHeader('Content-Security-Policy', "default-src 'self'"); + next(); +}); +``` + +--- + ## Reference Documentation -### Security Standards +| Document | Description | +|----------|-------------| +| `references/security_standards.md` | OWASP Top 10, secure coding, authentication, API security | +| `references/vulnerability_management_guide.md` | CVE triage, CVSS scoring, remediation workflows | +| `references/compliance_requirements.md` | SOC 2, PCI-DSS, HIPAA, GDPR requirements | -Comprehensive guide available in `references/security_standards.md`: - -- Detailed patterns and practices -- Code examples -- Best practices -- Anti-patterns to avoid -- Real-world scenarios - -### Vulnerability Management Guide - -Complete workflow documentation in `references/vulnerability_management_guide.md`: - -- Step-by-step processes -- Optimization strategies -- Tool integrations -- Performance tuning -- Troubleshooting guide - -### Compliance Requirements - -Technical reference guide in `references/compliance_requirements.md`: - -- Technology stack details -- Configuration examples -- Integration patterns -- Security considerations -- Scalability guidelines +--- ## Tech Stack -**Languages:** TypeScript, JavaScript, Python, Go, Swift, Kotlin -**Frontend:** React, Next.js, React Native, Flutter -**Backend:** Node.js, Express, GraphQL, REST APIs -**Database:** PostgreSQL, Prisma, NeonDB, Supabase -**DevOps:** Docker, Kubernetes, Terraform, GitHub Actions, CircleCI -**Cloud:** AWS, GCP, Azure +**Security Scanning:** +- Snyk (dependency scanning) +- Semgrep (SAST) +- CodeQL (code analysis) +- Trivy (container scanning) +- OWASP ZAP (DAST) -## Development Workflow +**Secrets Management:** +- HashiCorp Vault +- AWS Secrets Manager +- Azure Key Vault +- 1Password Secrets Automation -### 1. Setup and Configuration +**Authentication:** +- bcrypt, argon2 (password hashing) +- jsonwebtoken (JWT) +- passport.js (authentication middleware) +- speakeasy (TOTP/MFA) -```bash -# Install dependencies -npm install -# or -pip install -r requirements.txt +**Logging & Monitoring:** +- Winston, Pino (Node.js logging) +- Datadog, Splunk (SIEM) +- PagerDuty (alerting) -# Configure environment -cp .env.example .env -``` - -### 2. Run Quality Checks - -```bash -# Use the analyzer script -python scripts/vulnerability_assessor.py . - -# Review recommendations -# Apply fixes -``` - -### 3. Implement Best Practices - -Follow the patterns and practices documented in: -- `references/security_standards.md` -- `references/vulnerability_management_guide.md` -- `references/compliance_requirements.md` - -## Best Practices Summary - -### Code Quality -- Follow established patterns -- Write comprehensive tests -- Document decisions -- Review regularly - -### Performance -- Measure before optimizing -- Use appropriate caching -- Optimize critical paths -- Monitor in production - -### Security -- Validate all inputs -- Use parameterized queries -- Implement proper authentication -- Keep dependencies updated - -### Maintainability -- Write clear code -- Use consistent naming -- Add helpful comments -- Keep it simple - -## Common Commands - -```bash -# Development -npm run dev -npm run build -npm run test -npm run lint - -# Analysis -python scripts/vulnerability_assessor.py . -python scripts/compliance_checker.py --analyze - -# Deployment -docker build -t app:latest . -docker-compose up -d -kubectl apply -f k8s/ -``` - -## Troubleshooting - -### Common Issues - -Check the comprehensive troubleshooting section in `references/compliance_requirements.md`. - -### Getting Help - -- Review reference documentation -- Check script output messages -- Consult tech stack documentation -- Review error logs - -## Resources - -- Pattern Reference: `references/security_standards.md` -- Workflow Guide: `references/vulnerability_management_guide.md` -- Technical Guide: `references/compliance_requirements.md` -- Tool Scripts: `scripts/` directory +**Compliance:** +- Vanta (SOC 2 automation) +- Drata (compliance management) +- AWS Config (configuration compliance) diff --git a/engineering-team/senior-secops/references/compliance_requirements.md b/engineering-team/senior-secops/references/compliance_requirements.md index 0a6e443..a72067e 100644 --- a/engineering-team/senior-secops/references/compliance_requirements.md +++ b/engineering-team/senior-secops/references/compliance_requirements.md @@ -1,103 +1,792 @@ -# Compliance Requirements +# Compliance Requirements Reference -## Overview +Comprehensive guide for SOC 2, PCI-DSS, HIPAA, and GDPR compliance requirements. -This reference guide provides comprehensive information for senior secops. +--- -## Patterns and Practices +## Table of Contents -### Pattern 1: Best Practice Implementation +- [SOC 2 Type II](#soc-2-type-ii) +- [PCI-DSS](#pci-dss) +- [HIPAA](#hipaa) +- [GDPR](#gdpr) +- [Compliance Automation](#compliance-automation) +- [Audit Preparation](#audit-preparation) -**Description:** -Detailed explanation of the pattern. +--- -**When to Use:** -- Scenario 1 -- Scenario 2 -- Scenario 3 +## SOC 2 Type II -**Implementation:** -```typescript -// Example code implementation -export class Example { - // Implementation details -} +### Trust Service Criteria + +| Criteria | Description | Key Controls | +|----------|-------------|--------------| +| Security | Protection against unauthorized access | Access controls, encryption, monitoring | +| Availability | System uptime and performance | SLAs, redundancy, disaster recovery | +| Processing Integrity | Accurate and complete processing | Data validation, error handling | +| Confidentiality | Protection of confidential information | Encryption, access controls | +| Privacy | Personal information handling | Consent, data minimization | + +### Security Controls Checklist + +```markdown +## SOC 2 Security Controls + +### CC1: Control Environment +- [ ] Security policies documented and approved +- [ ] Organizational structure defined +- [ ] Security roles and responsibilities assigned +- [ ] Background checks performed on employees +- [ ] Security awareness training completed annually + +### CC2: Communication and Information +- [ ] Security policies communicated to employees +- [ ] Security incidents reported and tracked +- [ ] External communications about security controls +- [ ] Service level agreements documented + +### CC3: Risk Assessment +- [ ] Annual risk assessment performed +- [ ] Risk register maintained +- [ ] Risk treatment plans documented +- [ ] Vendor risk assessments completed +- [ ] Business impact analysis current + +### CC4: Monitoring Activities +- [ ] Security monitoring implemented +- [ ] Log aggregation and analysis +- [ ] Vulnerability scanning (weekly) +- [ ] Penetration testing (annual) +- [ ] Security metrics reviewed monthly + +### CC5: Control Activities +- [ ] Access control policies enforced +- [ ] MFA enabled for all users +- [ ] Password policy enforced (12+ chars) +- [ ] Access reviews (quarterly) +- [ ] Least privilege principle applied + +### CC6: Logical and Physical Access +- [ ] Identity management system +- [ ] Role-based access control +- [ ] Physical access controls +- [ ] Network segmentation +- [ ] Data center security + +### CC7: System Operations +- [ ] Change management process +- [ ] Incident management process +- [ ] Problem management process +- [ ] Capacity management +- [ ] Backup and recovery tested + +### CC8: Change Management +- [ ] Change control board +- [ ] Change approval workflow +- [ ] Testing requirements documented +- [ ] Rollback procedures +- [ ] Emergency change process + +### CC9: Risk Mitigation +- [ ] Insurance coverage +- [ ] Business continuity plan +- [ ] Disaster recovery plan tested +- [ ] Vendor management program ``` -**Benefits:** -- Benefit 1 -- Benefit 2 -- Benefit 3 +### Evidence Collection -**Trade-offs:** -- Consider 1 -- Consider 2 -- Consider 3 +```python +def collect_soc2_evidence(period_start: str, period_end: str) -> dict: + """ + Collect evidence for SOC 2 audit period. -### Pattern 2: Advanced Technique + Returns dictionary organized by Trust Service Criteria. + """ + evidence = { + 'period': {'start': period_start, 'end': period_end}, + 'security': { + 'access_reviews': get_access_reviews(period_start, period_end), + 'vulnerability_scans': get_vulnerability_reports(period_start, period_end), + 'penetration_tests': get_pentest_reports(period_start, period_end), + 'security_incidents': get_incident_reports(period_start, period_end), + 'training_records': get_training_completion(period_start, period_end), + }, + 'availability': { + 'uptime_reports': get_uptime_metrics(period_start, period_end), + 'incident_reports': get_availability_incidents(period_start, period_end), + 'dr_tests': get_dr_test_results(period_start, period_end), + 'backup_tests': get_backup_test_results(period_start, period_end), + }, + 'processing_integrity': { + 'data_validation_logs': get_validation_logs(period_start, period_end), + 'error_reports': get_error_reports(period_start, period_end), + 'reconciliation_reports': get_reconciliation_reports(period_start, period_end), + }, + 'confidentiality': { + 'encryption_status': get_encryption_audit(period_start, period_end), + 'data_classification': get_data_inventory(), + 'access_logs': get_sensitive_data_access_logs(period_start, period_end), + } + } -**Description:** -Another important pattern for senior secops. - -**Implementation:** -```typescript -// Advanced example -async function advancedExample() { - // Code here -} + return evidence ``` -## Guidelines +--- -### Code Organization -- Clear structure -- Logical separation -- Consistent naming -- Proper documentation +## PCI-DSS -### Performance Considerations -- Optimization strategies -- Bottleneck identification -- Monitoring approaches -- Scaling techniques +### PCI-DSS v4.0 Requirements -### Security Best Practices -- Input validation -- Authentication -- Authorization -- Data protection +| Requirement | Description | +|-------------|-------------| +| 1 | Install and maintain network security controls | +| 2 | Apply secure configurations | +| 3 | Protect stored account data | +| 4 | Protect cardholder data with cryptography during transmission | +| 5 | Protect all systems from malware | +| 6 | Develop and maintain secure systems and software | +| 7 | Restrict access to cardholder data by business need-to-know | +| 8 | Identify users and authenticate access | +| 9 | Restrict physical access to cardholder data | +| 10 | Log and monitor all access to network resources | +| 11 | Test security of systems and networks regularly | +| 12 | Support information security with organizational policies | -## Common Patterns +### Cardholder Data Protection -### Pattern A -Implementation details and examples. +```python +# PCI-DSS compliant card data handling -### Pattern B -Implementation details and examples. +import re +from cryptography.fernet import Fernet +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +import base64 +import os -### Pattern C -Implementation details and examples. +class PCIDataHandler: + """Handle cardholder data per PCI-DSS requirements.""" -## Anti-Patterns to Avoid + # PAN patterns (masked for display) + PAN_PATTERN = re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b') -### Anti-Pattern 1 -What not to do and why. + def __init__(self, encryption_key: bytes): + self.cipher = Fernet(encryption_key) -### Anti-Pattern 2 -What not to do and why. + @staticmethod + def mask_pan(pan: str) -> str: + """ + Mask PAN per PCI-DSS (show first 6, last 4 only). + Requirement 3.4: Render PAN unreadable. + """ + digits = re.sub(r'\D', '', pan) + if len(digits) < 13: + return '*' * len(digits) + return f"{digits[:6]}{'*' * (len(digits) - 10)}{digits[-4:]}" -## Tools and Resources + def encrypt_pan(self, pan: str) -> str: + """ + Encrypt PAN for storage. + Requirement 3.5: Protect keys used to protect stored account data. + """ + return self.cipher.encrypt(pan.encode()).decode() -### Recommended Tools -- Tool 1: Purpose -- Tool 2: Purpose -- Tool 3: Purpose + def decrypt_pan(self, encrypted_pan: str) -> str: + """Decrypt PAN (requires authorization logging).""" + return self.cipher.decrypt(encrypted_pan.encode()).decode() -### Further Reading -- Resource 1 -- Resource 2 -- Resource 3 + @staticmethod + def validate_pan(pan: str) -> bool: + """Validate PAN using Luhn algorithm.""" + digits = re.sub(r'\D', '', pan) + if len(digits) < 13 or len(digits) > 19: + return False -## Conclusion + # Luhn algorithm + total = 0 + for i, digit in enumerate(reversed(digits)): + d = int(digit) + if i % 2 == 1: + d *= 2 + if d > 9: + d -= 9 + total += d + return total % 10 == 0 -Key takeaways for using this reference guide effectively. + def sanitize_logs(self, log_message: str) -> str: + """ + Remove PAN from log messages. + Requirement 3.3: Mask PAN when displayed. + """ + def replace_pan(match): + return self.mask_pan(match.group()) + + return self.PAN_PATTERN.sub(replace_pan, log_message) +``` + +### Network Segmentation + +```yaml +# PCI-DSS network segmentation example + +# Cardholder Data Environment (CDE) firewall rules +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: cde-isolation + namespace: payment-processing +spec: + podSelector: + matchLabels: + pci-zone: cde + policyTypes: + - Ingress + - Egress + ingress: + # Only allow from payment gateway + - from: + - namespaceSelector: + matchLabels: + pci-zone: dmz + - podSelector: + matchLabels: + app: payment-gateway + ports: + - protocol: TCP + port: 443 + egress: + # Only allow to payment processor + - to: + - ipBlock: + cidr: 10.0.100.0/24 # Payment processor network + ports: + - protocol: TCP + port: 443 + # Allow DNS + - to: + - namespaceSelector: {} + podSelector: + matchLabels: + k8s-app: kube-dns + ports: + - protocol: UDP + port: 53 +``` + +--- + +## HIPAA + +### HIPAA Security Rule Requirements + +| Safeguard | Standard | Implementation | +|-----------|----------|----------------| +| Administrative | Security Management | Risk analysis, sanctions, activity review | +| Administrative | Workforce Security | Authorization, clearance, termination | +| Administrative | Information Access | Access authorization, workstation use | +| Administrative | Security Awareness | Training, login monitoring, password management | +| Administrative | Security Incident | Response and reporting procedures | +| Administrative | Contingency Plan | Backup, disaster recovery, emergency mode | +| Physical | Facility Access | Access controls, maintenance records | +| Physical | Workstation | Use policies, security | +| Physical | Device and Media | Disposal, media re-use, accountability | +| Technical | Access Control | Unique user ID, emergency access, encryption | +| Technical | Audit Controls | Hardware, software, procedural mechanisms | +| Technical | Integrity | Mechanisms to ensure PHI not altered | +| Technical | Transmission | Encryption of PHI in transit | + +### PHI Handling + +```python +from dataclasses import dataclass +from datetime import datetime +from typing import Optional +import hashlib +import logging + +# Configure PHI audit logging +phi_logger = logging.getLogger('phi_access') +phi_logger.setLevel(logging.INFO) + +@dataclass +class PHIAccessLog: + """HIPAA-compliant PHI access logging.""" + timestamp: datetime + user_id: str + patient_id: str + action: str # view, create, update, delete, export + reason: str + data_elements: list + source_ip: str + success: bool + +def log_phi_access(access: PHIAccessLog): + """ + Log PHI access per HIPAA requirements. + 164.312(b): Audit controls. + """ + phi_logger.info( + f"PHI_ACCESS|" + f"timestamp={access.timestamp.isoformat()}|" + f"user={access.user_id}|" + f"patient={access.patient_id}|" + f"action={access.action}|" + f"reason={access.reason}|" + f"elements={','.join(access.data_elements)}|" + f"ip={access.source_ip}|" + f"success={access.success}" + ) + +class HIPAACompliantStorage: + """HIPAA-compliant PHI storage handler.""" + + # Minimum Necessary Standard - only access needed data + PHI_ELEMENTS = { + 'patient_name': 'high', + 'ssn': 'high', + 'medical_record_number': 'high', + 'diagnosis': 'medium', + 'treatment_plan': 'medium', + 'appointment_date': 'low', + 'provider_name': 'low' + } + + def __init__(self, encryption_service, user_context): + self.encryption = encryption_service + self.user = user_context + + def access_phi( + self, + patient_id: str, + elements: list, + reason: str + ) -> Optional[dict]: + """ + Access PHI with HIPAA controls. + + Args: + patient_id: Patient identifier + elements: List of PHI elements to access + reason: Business reason for access + + Returns: + Requested PHI elements if authorized + """ + # Verify minimum necessary - user only gets needed elements + authorized_elements = self._check_authorization(elements) + + if not authorized_elements: + log_phi_access(PHIAccessLog( + timestamp=datetime.utcnow(), + user_id=self.user.id, + patient_id=patient_id, + action='view', + reason=reason, + data_elements=elements, + source_ip=self.user.ip_address, + success=False + )) + raise PermissionError("Not authorized for requested PHI elements") + + # Retrieve and decrypt PHI + phi_data = self._retrieve_phi(patient_id, authorized_elements) + + # Log successful access + log_phi_access(PHIAccessLog( + timestamp=datetime.utcnow(), + user_id=self.user.id, + patient_id=patient_id, + action='view', + reason=reason, + data_elements=authorized_elements, + source_ip=self.user.ip_address, + success=True + )) + + return phi_data + + def _check_authorization(self, requested_elements: list) -> list: + """Check user authorization for PHI elements.""" + user_clearance = self.user.hipaa_clearance_level + authorized = [] + + for element in requested_elements: + element_level = self.PHI_ELEMENTS.get(element, 'high') + if self._clearance_allows(user_clearance, element_level): + authorized.append(element) + + return authorized +``` + +--- + +## GDPR + +### GDPR Principles + +| Principle | Description | Implementation | +|-----------|-------------|----------------| +| Lawfulness | Legal basis for processing | Consent management, contract basis | +| Purpose Limitation | Specific, explicit purposes | Data use policies, access controls | +| Data Minimization | Adequate, relevant, limited | Collection limits, retention policies | +| Accuracy | Keep data accurate | Update procedures, validation | +| Storage Limitation | Time-limited retention | Retention schedules, deletion | +| Integrity & Confidentiality | Secure processing | Encryption, access controls | +| Accountability | Demonstrate compliance | Documentation, DPO, DPIA | + +### Data Subject Rights Implementation + +```python +from datetime import datetime, timedelta +from enum import Enum +from typing import Optional, List +import json + +class DSRType(Enum): + ACCESS = "access" # Article 15 + RECTIFICATION = "rectification" # Article 16 + ERASURE = "erasure" # Article 17 (Right to be forgotten) + RESTRICTION = "restriction" # Article 18 + PORTABILITY = "portability" # Article 20 + OBJECTION = "objection" # Article 21 + +class DataSubjectRequest: + """Handle GDPR Data Subject Requests.""" + + # GDPR requires response within 30 days + RESPONSE_DEADLINE_DAYS = 30 + + def __init__(self, db, notification_service): + self.db = db + self.notifications = notification_service + + def submit_request( + self, + subject_email: str, + request_type: DSRType, + details: str + ) -> dict: + """ + Submit a Data Subject Request. + + Args: + subject_email: Email of the data subject + request_type: Type of GDPR request + details: Additional request details + + Returns: + Request tracking information + """ + # Verify identity before processing + verification_token = self._send_verification(subject_email) + + request = { + 'id': self._generate_request_id(), + 'subject_email': subject_email, + 'type': request_type.value, + 'details': details, + 'status': 'pending_verification', + 'submitted_at': datetime.utcnow().isoformat(), + 'deadline': (datetime.utcnow() + timedelta(days=self.RESPONSE_DEADLINE_DAYS)).isoformat(), + 'verification_token': verification_token + } + + self.db.dsr_requests.insert(request) + + # Notify DPO + self.notifications.notify_dpo( + f"New DSR ({request_type.value}) received", + request + ) + + return { + 'request_id': request['id'], + 'deadline': request['deadline'], + 'status': 'verification_sent' + } + + def process_erasure_request(self, request_id: str) -> dict: + """ + Process Article 17 erasure request (Right to be Forgotten). + + Returns: + Erasure completion report + """ + request = self.db.dsr_requests.find_one({'id': request_id}) + subject_email = request['subject_email'] + + erasure_report = { + 'request_id': request_id, + 'subject': subject_email, + 'systems_processed': [], + 'data_deleted': [], + 'data_retained': [], # With legal basis + 'completed_at': None + } + + # Find all data for this subject + data_inventory = self._find_subject_data(subject_email) + + for data_item in data_inventory: + if self._can_delete(data_item): + self._delete_data(data_item) + erasure_report['data_deleted'].append({ + 'system': data_item['system'], + 'data_type': data_item['type'], + 'deleted_at': datetime.utcnow().isoformat() + }) + else: + erasure_report['data_retained'].append({ + 'system': data_item['system'], + 'data_type': data_item['type'], + 'retention_reason': data_item['legal_basis'] + }) + + erasure_report['completed_at'] = datetime.utcnow().isoformat() + + # Update request status + self.db.dsr_requests.update( + {'id': request_id}, + {'status': 'completed', 'completion_report': erasure_report} + ) + + return erasure_report + + def generate_portability_export(self, request_id: str) -> dict: + """ + Generate Article 20 data portability export. + + Returns machine-readable export in JSON format. + """ + request = self.db.dsr_requests.find_one({'id': request_id}) + subject_email = request['subject_email'] + + export_data = { + 'export_date': datetime.utcnow().isoformat(), + 'data_subject': subject_email, + 'format': 'JSON', + 'data': {} + } + + # Collect data from all systems + systems = ['user_accounts', 'orders', 'preferences', 'communications'] + + for system in systems: + system_data = self._extract_portable_data(system, subject_email) + if system_data: + export_data['data'][system] = system_data + + return export_data +``` + +### Consent Management + +```python +class ConsentManager: + """GDPR-compliant consent management.""" + + def __init__(self, db): + self.db = db + + def record_consent( + self, + user_id: str, + purpose: str, + consent_given: bool, + consent_text: str + ) -> dict: + """ + Record consent per GDPR Article 7 requirements. + + Consent must be: + - Freely given + - Specific + - Informed + - Unambiguous + """ + consent_record = { + 'user_id': user_id, + 'purpose': purpose, + 'consent_given': consent_given, + 'consent_text': consent_text, + 'timestamp': datetime.utcnow().isoformat(), + 'method': 'explicit_checkbox', # Not pre-ticked + 'ip_address': self._get_user_ip(), + 'user_agent': self._get_user_agent(), + 'version': '1.0' # Track consent version + } + + self.db.consents.insert(consent_record) + + return consent_record + + def check_consent(self, user_id: str, purpose: str) -> bool: + """Check if user has given consent for specific purpose.""" + latest_consent = self.db.consents.find_one( + {'user_id': user_id, 'purpose': purpose}, + sort=[('timestamp', -1)] + ) + + return latest_consent and latest_consent.get('consent_given', False) + + def withdraw_consent(self, user_id: str, purpose: str) -> dict: + """ + Process consent withdrawal. + + GDPR Article 7(3): Withdrawal must be as easy as giving consent. + """ + withdrawal_record = { + 'user_id': user_id, + 'purpose': purpose, + 'consent_given': False, + 'timestamp': datetime.utcnow().isoformat(), + 'action': 'withdrawal' + } + + self.db.consents.insert(withdrawal_record) + + # Trigger data processing stop for this purpose + self._stop_processing(user_id, purpose) + + return withdrawal_record +``` + +--- + +## Compliance Automation + +### Automated Compliance Checks + +```yaml +# compliance-checks.yml - GitHub Actions + +name: Compliance Checks + +on: + push: + branches: [main] + pull_request: + schedule: + - cron: '0 0 * * *' # Daily + +jobs: + soc2-checks: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Check for secrets in code + run: | + gitleaks detect --source . --report-format json --report-path gitleaks-report.json + if [ -s gitleaks-report.json ]; then + echo "Secrets detected in code!" + exit 1 + fi + + - name: Verify encryption at rest + run: | + # Check database encryption configuration + python scripts/compliance_checker.py --check encryption + + - name: Verify access controls + run: | + # Check RBAC configuration + python scripts/compliance_checker.py --check access-control + + - name: Check logging configuration + run: | + # Verify audit logging enabled + python scripts/compliance_checker.py --check audit-logging + + pci-checks: + runs-on: ubuntu-latest + if: contains(github.event.head_commit.message, '[pci]') + steps: + - uses: actions/checkout@v4 + + - name: Scan for PAN in code + run: | + # Check for unencrypted card numbers + python scripts/compliance_checker.py --check pci-pan-exposure + + - name: Verify TLS configuration + run: | + python scripts/compliance_checker.py --check tls-config + + gdpr-checks: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Check data retention policies + run: | + python scripts/compliance_checker.py --check data-retention + + - name: Verify consent mechanisms + run: | + python scripts/compliance_checker.py --check consent-management +``` + +--- + +## Audit Preparation + +### Audit Readiness Checklist + +```markdown +## Pre-Audit Checklist + +### 60 Days Before Audit +- [ ] Confirm audit scope and timeline +- [ ] Identify control owners +- [ ] Begin evidence collection +- [ ] Review previous audit findings +- [ ] Update policies and procedures + +### 30 Days Before Audit +- [ ] Complete evidence collection +- [ ] Perform internal control testing +- [ ] Remediate any gaps identified +- [ ] Prepare executive summary +- [ ] Brief stakeholders + +### 7 Days Before Audit +- [ ] Finalize evidence package +- [ ] Prepare interview schedules +- [ ] Set up secure evidence sharing +- [ ] Confirm auditor logistics +- [ ] Final gap assessment + +### During Audit +- [ ] Daily status meetings +- [ ] Timely evidence delivery +- [ ] Document all requests +- [ ] Escalate issues promptly +- [ ] Maintain communication log +``` + +### Evidence Repository Structure + +``` +evidence/ +├── period_YYYY-MM/ +│ ├── security/ +│ │ ├── access_reviews/ +│ │ ├── vulnerability_scans/ +│ │ ├── penetration_tests/ +│ │ └── security_training/ +│ ├── availability/ +│ │ ├── uptime_reports/ +│ │ ├── incident_reports/ +│ │ └── dr_tests/ +│ ├── change_management/ +│ │ ├── change_requests/ +│ │ ├── approval_records/ +│ │ └── deployment_logs/ +│ ├── policies/ +│ │ ├── current_policies/ +│ │ └── acknowledgments/ +│ └── index.json +``` diff --git a/engineering-team/senior-secops/references/security_standards.md b/engineering-team/senior-secops/references/security_standards.md index d3b591d..8b4c1f1 100644 --- a/engineering-team/senior-secops/references/security_standards.md +++ b/engineering-team/senior-secops/references/security_standards.md @@ -1,103 +1,718 @@ -# Security Standards +# Security Standards Reference -## Overview +Comprehensive security standards and secure coding practices for application security. -This reference guide provides comprehensive information for senior secops. +--- -## Patterns and Practices +## Table of Contents -### Pattern 1: Best Practice Implementation +- [OWASP Top 10](#owasp-top-10) +- [Secure Coding Practices](#secure-coding-practices) +- [Authentication Standards](#authentication-standards) +- [API Security](#api-security) +- [Secrets Management](#secrets-management) +- [Security Headers](#security-headers) -**Description:** -Detailed explanation of the pattern. +--- -**When to Use:** -- Scenario 1 -- Scenario 2 -- Scenario 3 +## OWASP Top 10 -**Implementation:** -```typescript -// Example code implementation -export class Example { - // Implementation details -} +### A01:2021 - Broken Access Control + +**Description:** Access control enforces policy such that users cannot act outside of their intended permissions. + +**Prevention:** + +```python +# BAD - No authorization check +@app.route('/admin/users/') +def get_user(user_id): + return User.query.get(user_id).to_dict() + +# GOOD - Authorization enforced +@app.route('/admin/users/') +@requires_role('admin') +def get_user(user_id): + user = User.query.get_or_404(user_id) + if not current_user.can_access(user): + abort(403) + return user.to_dict() ``` -**Benefits:** -- Benefit 1 -- Benefit 2 -- Benefit 3 +**Checklist:** +- [ ] Deny access by default (allowlist approach) +- [ ] Implement RBAC or ABAC consistently +- [ ] Validate object-level authorization (IDOR prevention) +- [ ] Disable directory listing +- [ ] Log access control failures and alert on repeated failures -**Trade-offs:** -- Consider 1 -- Consider 2 -- Consider 3 +### A02:2021 - Cryptographic Failures -### Pattern 2: Advanced Technique +**Description:** Failures related to cryptography which often lead to exposure of sensitive data. -**Description:** -Another important pattern for senior secops. +**Prevention:** -**Implementation:** -```typescript -// Advanced example -async function advancedExample() { - // Code here -} +```python +# BAD - Weak hashing +import hashlib +password_hash = hashlib.md5(password.encode()).hexdigest() + +# GOOD - Strong password hashing +from argon2 import PasswordHasher +ph = PasswordHasher( + time_cost=3, + memory_cost=65536, + parallelism=4 +) +password_hash = ph.hash(password) + +# Verify password +try: + ph.verify(stored_hash, password) +except argon2.exceptions.VerifyMismatchError: + raise InvalidCredentials() ``` -## Guidelines +**Checklist:** +- [ ] Use TLS 1.2+ for all data in transit +- [ ] Use AES-256-GCM for encryption at rest +- [ ] Use Argon2id, bcrypt, or scrypt for passwords +- [ ] Never use MD5, SHA1 for security purposes +- [ ] Rotate encryption keys regularly -### Code Organization -- Clear structure -- Logical separation -- Consistent naming -- Proper documentation +### A03:2021 - Injection -### Performance Considerations -- Optimization strategies -- Bottleneck identification -- Monitoring approaches -- Scaling techniques +**Description:** Untrusted data sent to an interpreter as part of a command or query. -### Security Best Practices -- Input validation -- Authentication -- Authorization -- Data protection +**SQL Injection Prevention:** -## Common Patterns +```python +# BAD - String concatenation (VULNERABLE) +query = f"SELECT * FROM users WHERE id = {user_id}" +cursor.execute(query) -### Pattern A -Implementation details and examples. +# GOOD - Parameterized queries +cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) -### Pattern B -Implementation details and examples. +# GOOD - ORM with parameter binding +user = User.query.filter_by(id=user_id).first() +``` -### Pattern C -Implementation details and examples. +**Command Injection Prevention:** -## Anti-Patterns to Avoid +```python +# BAD - Shell execution with user input (VULNERABLE) +# NEVER use: os.system(f"ping {user_input}") -### Anti-Pattern 1 -What not to do and why. +# GOOD - Use subprocess with shell=False and validated input +import subprocess -### Anti-Pattern 2 -What not to do and why. +def safe_ping(hostname: str) -> str: + # Validate hostname format first + if not is_valid_hostname(hostname): + raise ValueError("Invalid hostname") + result = subprocess.run( + ["ping", "-c", "4", hostname], + shell=False, + capture_output=True, + text=True + ) + return result.stdout +``` -## Tools and Resources +**XSS Prevention:** -### Recommended Tools -- Tool 1: Purpose -- Tool 2: Purpose -- Tool 3: Purpose +```python +# BAD - Direct HTML insertion (VULNERABLE) +return f"
Welcome, {username}
" -### Further Reading -- Resource 1 -- Resource 2 -- Resource 3 +# GOOD - HTML escaping +from markupsafe import escape +return f"
Welcome, {escape(username)}
" -## Conclusion +# GOOD - Template auto-escaping (Jinja2) +# {{ username }} is auto-escaped by default +``` -Key takeaways for using this reference guide effectively. +### A04:2021 - Insecure Design + +**Description:** Risks related to design and architectural flaws. + +**Prevention Patterns:** + +```python +# Threat modeling categories (STRIDE) +THREATS = { + 'Spoofing': 'Authentication controls', + 'Tampering': 'Integrity controls', + 'Repudiation': 'Audit logging', + 'Information Disclosure': 'Encryption, access control', + 'Denial of Service': 'Rate limiting, resource limits', + 'Elevation of Privilege': 'Authorization controls' +} + +# Defense in depth - multiple layers +class SecurePaymentFlow: + def process_payment(self, payment_data): + # Layer 1: Input validation + self.validate_input(payment_data) + + # Layer 2: Authentication check + self.verify_user_authenticated() + + # Layer 3: Authorization check + self.verify_user_can_pay(payment_data.amount) + + # Layer 4: Rate limiting + self.check_rate_limit() + + # Layer 5: Fraud detection + self.check_fraud_signals(payment_data) + + # Layer 6: Secure processing + return self.execute_payment(payment_data) +``` + +### A05:2021 - Security Misconfiguration + +**Description:** Missing or incorrect security hardening. + +**Prevention:** + +```yaml +# Kubernetes pod security +apiVersion: v1 +kind: Pod +spec: + securityContext: + runAsNonRoot: true + runAsUser: 1000 + fsGroup: 1000 + containers: + - name: app + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + capabilities: + drop: + - ALL +``` + +```python +# Flask security configuration +app.config.update( + SESSION_COOKIE_SECURE=True, + SESSION_COOKIE_HTTPONLY=True, + SESSION_COOKIE_SAMESITE='Lax', + PERMANENT_SESSION_LIFETIME=timedelta(hours=1), +) +``` + +--- + +## Secure Coding Practices + +### Input Validation + +```python +from pydantic import BaseModel, validator, constr +from typing import Optional +import re + +class UserInput(BaseModel): + username: constr(min_length=3, max_length=50, regex=r'^[a-zA-Z0-9_]+$') + email: str + age: Optional[int] = None + + @validator('email') + def validate_email(cls, v): + # Use proper email validation + pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + if not re.match(pattern, v): + raise ValueError('Invalid email format') + return v.lower() + + @validator('age') + def validate_age(cls, v): + if v is not None and (v < 0 or v > 150): + raise ValueError('Age must be between 0 and 150') + return v +``` + +### Output Encoding + +```python +import html +import json +from urllib.parse import quote + +def encode_for_html(data: str) -> str: + """Encode data for safe HTML output.""" + return html.escape(data) + +def encode_for_javascript(data: str) -> str: + """Encode data for safe JavaScript string.""" + return json.dumps(data) + +def encode_for_url(data: str) -> str: + """Encode data for safe URL parameter.""" + return quote(data, safe='') + +def encode_for_css(data: str) -> str: + """Encode data for safe CSS value.""" + return ''.join( + c if c.isalnum() else f'\\{ord(c):06x}' + for c in data + ) +``` + +### Error Handling + +```python +import logging +from typing import Dict, Any + +logger = logging.getLogger(__name__) + +class SecurityException(Exception): + """Base exception for security-related errors.""" + + def __init__(self, message: str, internal_details: str = None): + # User-facing message (safe to display) + self.message = message + # Internal details (for logging only) + self.internal_details = internal_details + super().__init__(message) + +def handle_request(): + try: + process_sensitive_data() + except DatabaseError as e: + # Log full details internally + logger.error(f"Database error: {e}", exc_info=True) + # Return generic message to user + raise SecurityException( + "An error occurred processing your request", + internal_details=str(e) + ) + except Exception as e: + logger.error(f"Unexpected error: {e}", exc_info=True) + raise SecurityException("An unexpected error occurred") +``` + +--- + +## Authentication Standards + +### Password Requirements + +```python +import re +from typing import Tuple + +def validate_password(password: str) -> Tuple[bool, str]: + """ + Validate password against security requirements. + + Requirements: + - Minimum 12 characters + - At least one uppercase letter + - At least one lowercase letter + - At least one digit + - At least one special character + - Not in common password list + """ + if len(password) < 12: + return False, "Password must be at least 12 characters" + + if not re.search(r'[A-Z]', password): + return False, "Password must contain uppercase letter" + + if not re.search(r'[a-z]', password): + return False, "Password must contain lowercase letter" + + if not re.search(r'\d', password): + return False, "Password must contain a digit" + + if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): + return False, "Password must contain special character" + + # Check against common passwords (use haveibeenpwned API in production) + common_passwords = {'password123', 'qwerty123456', 'admin123456'} + if password.lower() in common_passwords: + return False, "Password is too common" + + return True, "Password meets requirements" +``` + +### JWT Best Practices + +```python +import jwt +from datetime import datetime, timedelta +from typing import Dict, Optional + +class JWTManager: + def __init__(self, secret_key: str, algorithm: str = 'HS256'): + self.secret_key = secret_key + self.algorithm = algorithm + self.access_token_expiry = timedelta(minutes=15) + self.refresh_token_expiry = timedelta(days=7) + + def create_access_token(self, user_id: str, roles: list) -> str: + payload = { + 'sub': user_id, + 'roles': roles, + 'type': 'access', + 'iat': datetime.utcnow(), + 'exp': datetime.utcnow() + self.access_token_expiry, + 'jti': self._generate_jti() # Unique token ID for revocation + } + return jwt.encode(payload, self.secret_key, algorithm=self.algorithm) + + def verify_token(self, token: str) -> Optional[Dict]: + try: + payload = jwt.decode( + token, + self.secret_key, + algorithms=[self.algorithm], + options={ + 'require': ['exp', 'iat', 'sub', 'jti'], + 'verify_exp': True + } + ) + + # Check if token is revoked + if self._is_token_revoked(payload['jti']): + return None + + return payload + except jwt.ExpiredSignatureError: + return None + except jwt.InvalidTokenError: + return None +``` + +### MFA Implementation + +```python +import pyotp +import qrcode +from io import BytesIO +import base64 + +class TOTPManager: + def __init__(self, issuer: str = "MyApp"): + self.issuer = issuer + + def generate_secret(self) -> str: + """Generate a new TOTP secret for a user.""" + return pyotp.random_base32() + + def get_provisioning_uri(self, secret: str, email: str) -> str: + """Generate URI for QR code.""" + totp = pyotp.TOTP(secret) + return totp.provisioning_uri(name=email, issuer_name=self.issuer) + + def generate_qr_code(self, provisioning_uri: str) -> str: + """Generate base64-encoded QR code image.""" + qr = qrcode.QRCode(version=1, box_size=10, border=5) + qr.add_data(provisioning_uri) + qr.make(fit=True) + + img = qr.make_image(fill_color="black", back_color="white") + buffer = BytesIO() + img.save(buffer, format='PNG') + return base64.b64encode(buffer.getvalue()).decode() + + def verify_totp(self, secret: str, code: str) -> bool: + """Verify TOTP code with time window tolerance.""" + totp = pyotp.TOTP(secret) + # Allow 1 period before/after for clock skew + return totp.verify(code, valid_window=1) +``` + +--- + +## API Security + +### Rate Limiting + +```python +from functools import wraps +from flask import request, jsonify +import time +from collections import defaultdict +import threading + +class RateLimiter: + def __init__(self, requests_per_minute: int = 60): + self.requests_per_minute = requests_per_minute + self.requests = defaultdict(list) + self.lock = threading.Lock() + + def is_rate_limited(self, identifier: str) -> bool: + with self.lock: + now = time.time() + minute_ago = now - 60 + + # Clean old requests + self.requests[identifier] = [ + req_time for req_time in self.requests[identifier] + if req_time > minute_ago + ] + + if len(self.requests[identifier]) >= self.requests_per_minute: + return True + + self.requests[identifier].append(now) + return False + +rate_limiter = RateLimiter(requests_per_minute=100) + +def rate_limit(f): + @wraps(f) + def decorated_function(*args, **kwargs): + identifier = request.remote_addr + + if rate_limiter.is_rate_limited(identifier): + return jsonify({ + 'error': 'Rate limit exceeded', + 'retry_after': 60 + }), 429 + + return f(*args, **kwargs) + return decorated_function +``` + +### API Key Validation + +```python +import hashlib +import secrets +from datetime import datetime +from typing import Optional, Dict + +class APIKeyManager: + def __init__(self, db): + self.db = db + + def generate_api_key(self, user_id: str, name: str, scopes: list) -> Dict: + """Generate a new API key.""" + # Generate key with prefix for identification + raw_key = f"sk_live_{secrets.token_urlsafe(32)}" + + # Store hash only + key_hash = hashlib.sha256(raw_key.encode()).hexdigest() + + api_key_record = { + 'id': secrets.token_urlsafe(16), + 'user_id': user_id, + 'name': name, + 'key_hash': key_hash, + 'key_prefix': raw_key[:12], # Store prefix for identification + 'scopes': scopes, + 'created_at': datetime.utcnow(), + 'last_used_at': None + } + + self.db.api_keys.insert(api_key_record) + + # Return raw key only once + return { + 'key': raw_key, + 'id': api_key_record['id'], + 'scopes': scopes + } + + def validate_api_key(self, raw_key: str) -> Optional[Dict]: + """Validate an API key and return associated data.""" + key_hash = hashlib.sha256(raw_key.encode()).hexdigest() + + api_key = self.db.api_keys.find_one({'key_hash': key_hash}) + + if not api_key: + return None + + # Update last used timestamp + self.db.api_keys.update( + {'id': api_key['id']}, + {'last_used_at': datetime.utcnow()} + ) + + return { + 'user_id': api_key['user_id'], + 'scopes': api_key['scopes'] + } +``` + +--- + +## Secrets Management + +### Environment Variables + +```python +import os +from typing import Optional +from dataclasses import dataclass + +@dataclass +class AppSecrets: + database_url: str + jwt_secret: str + api_key: str + encryption_key: str + +def load_secrets() -> AppSecrets: + """Load secrets from environment with validation.""" + + def get_required(name: str) -> str: + value = os.environ.get(name) + if not value: + raise ValueError(f"Required environment variable {name} is not set") + return value + + return AppSecrets( + database_url=get_required('DATABASE_URL'), + jwt_secret=get_required('JWT_SECRET'), + api_key=get_required('API_KEY'), + encryption_key=get_required('ENCRYPTION_KEY') + ) + +# Never log secrets +import logging + +class SecretFilter(logging.Filter): + """Filter to redact secrets from logs.""" + + def __init__(self, secrets: list): + super().__init__() + self.secrets = secrets + + def filter(self, record): + message = record.getMessage() + for secret in self.secrets: + if secret in message: + record.msg = record.msg.replace(secret, '[REDACTED]') + return True +``` + +### HashiCorp Vault Integration + +```python +import hvac +from typing import Dict, Optional + +class VaultClient: + def __init__(self, url: str, token: str = None, role_id: str = None, secret_id: str = None): + self.client = hvac.Client(url=url) + + if token: + self.client.token = token + elif role_id and secret_id: + # AppRole authentication + self.client.auth.approle.login( + role_id=role_id, + secret_id=secret_id + ) + + def get_secret(self, path: str, key: str) -> Optional[str]: + """Retrieve a secret from Vault.""" + try: + response = self.client.secrets.kv.v2.read_secret_version(path=path) + return response['data']['data'].get(key) + except hvac.exceptions.InvalidPath: + return None + + def get_database_credentials(self, role: str) -> Dict[str, str]: + """Get dynamic database credentials.""" + response = self.client.secrets.database.generate_credentials(name=role) + return { + 'username': response['data']['username'], + 'password': response['data']['password'], + 'lease_id': response['lease_id'], + 'lease_duration': response['lease_duration'] + } +``` + +--- + +## Security Headers + +### HTTP Security Headers + +```python +from flask import Flask, Response + +def add_security_headers(response: Response) -> Response: + """Add security headers to HTTP response.""" + + # Prevent clickjacking + response.headers['X-Frame-Options'] = 'DENY' + + # Enable XSS filter + response.headers['X-XSS-Protection'] = '1; mode=block' + + # Prevent MIME type sniffing + response.headers['X-Content-Type-Options'] = 'nosniff' + + # Referrer policy + response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' + + # Content Security Policy + response.headers['Content-Security-Policy'] = ( + "default-src 'self'; " + "script-src 'self' 'unsafe-inline'; " + "style-src 'self' 'unsafe-inline'; " + "img-src 'self' data: https:; " + "font-src 'self'; " + "frame-ancestors 'none'; " + "form-action 'self'" + ) + + # HSTS (enable only with valid HTTPS) + response.headers['Strict-Transport-Security'] = ( + 'max-age=31536000; includeSubDomains; preload' + ) + + # Permissions Policy + response.headers['Permissions-Policy'] = ( + 'geolocation=(), microphone=(), camera=()' + ) + + return response + +app = Flask(__name__) +app.after_request(add_security_headers) +``` + +--- + +## Quick Reference + +### Security Checklist + +| Category | Check | Priority | +|----------|-------|----------| +| Authentication | MFA enabled | Critical | +| Authentication | Password policy enforced | Critical | +| Authorization | RBAC implemented | Critical | +| Input | All inputs validated | Critical | +| Injection | Parameterized queries | Critical | +| Crypto | TLS 1.2+ enforced | Critical | +| Secrets | No hardcoded secrets | Critical | +| Headers | Security headers set | High | +| Logging | Security events logged | High | +| Dependencies | No known vulnerabilities | High | + +### Tool Recommendations + +| Purpose | Tool | Usage | +|---------|------|-------| +| SAST | Semgrep | `semgrep --config auto .` | +| SAST | Bandit (Python) | `bandit -r src/` | +| Secrets | Gitleaks | `gitleaks detect --source .` | +| Dependencies | Snyk | `snyk test` | +| Container | Trivy | `trivy image myapp:latest` | +| DAST | OWASP ZAP | Dynamic scanning | diff --git a/engineering-team/senior-secops/references/vulnerability_management_guide.md b/engineering-team/senior-secops/references/vulnerability_management_guide.md index 67fb057..1507163 100644 --- a/engineering-team/senior-secops/references/vulnerability_management_guide.md +++ b/engineering-team/senior-secops/references/vulnerability_management_guide.md @@ -1,103 +1,487 @@ # Vulnerability Management Guide -## Overview +Complete workflow for vulnerability identification, assessment, prioritization, and remediation. -This reference guide provides comprehensive information for senior secops. +--- -## Patterns and Practices +## Table of Contents -### Pattern 1: Best Practice Implementation +- [Vulnerability Lifecycle](#vulnerability-lifecycle) +- [CVE Triage Process](#cve-triage-process) +- [CVSS Scoring](#cvss-scoring) +- [Remediation Workflows](#remediation-workflows) +- [Dependency Scanning](#dependency-scanning) +- [Security Incident Response](#security-incident-response) -**Description:** -Detailed explanation of the pattern. +--- -**When to Use:** -- Scenario 1 -- Scenario 2 -- Scenario 3 +## Vulnerability Lifecycle -**Implementation:** -```typescript -// Example code implementation -export class Example { - // Implementation details -} +### Overview + +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ DISCOVER │ → │ ASSESS │ → │ PRIORITIZE │ → │ REMEDIATE │ +│ │ │ │ │ │ │ │ +│ - Scanning │ │ - CVSS │ │ - Risk │ │ - Patch │ +│ - Reports │ │ - Context │ │ - Business │ │ - Mitigate │ +│ - Audits │ │ - Impact │ │ - SLA │ │ - Accept │ +└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ + │ + ▼ + ┌─────────────┐ + │ VERIFY │ + │ │ + │ - Retest │ + │ - Close │ + └─────────────┘ ``` -**Benefits:** -- Benefit 1 -- Benefit 2 -- Benefit 3 +### State Definitions -**Trade-offs:** -- Consider 1 -- Consider 2 -- Consider 3 +| State | Description | Owner | +|-------|-------------|-------| +| New | Vulnerability discovered, not yet triaged | Security Team | +| Triaging | Under assessment for severity and impact | Security Team | +| Assigned | Assigned to development team for fix | Dev Team | +| In Progress | Fix being developed | Dev Team | +| In Review | Fix in code review | Dev Team | +| Testing | Fix being tested | QA Team | +| Deployed | Fix deployed to production | DevOps Team | +| Verified | Fix confirmed effective | Security Team | +| Closed | Vulnerability resolved | Security Team | +| Accepted Risk | Risk accepted with justification | CISO | -### Pattern 2: Advanced Technique +--- -**Description:** -Another important pattern for senior secops. +## CVE Triage Process -**Implementation:** -```typescript -// Advanced example -async function advancedExample() { - // Code here -} +### Step 1: Initial Assessment + +```python +def triage_cve(cve_id: str, affected_systems: list) -> dict: + """ + Perform initial triage of a CVE. + + Returns triage assessment with severity and recommended actions. + """ + # Fetch CVE details from NVD + cve_data = fetch_nvd_data(cve_id) + + assessment = { + 'cve_id': cve_id, + 'published': cve_data['published'], + 'base_cvss': cve_data['cvss_v3']['base_score'], + 'vector': cve_data['cvss_v3']['vector_string'], + 'description': cve_data['description'], + 'affected_systems': [], + 'exploitability': check_exploitability(cve_id), + 'recommendation': None + } + + # Check which systems are actually affected + for system in affected_systems: + if is_system_vulnerable(system, cve_data): + assessment['affected_systems'].append({ + 'name': system.name, + 'version': system.version, + 'exposure': assess_exposure(system) + }) + + # Determine recommendation + assessment['recommendation'] = determine_action(assessment) + + return assessment ``` -## Guidelines +### Step 2: Severity Classification -### Code Organization -- Clear structure -- Logical separation -- Consistent naming -- Proper documentation +| CVSS Score | Severity | Response SLA | +|------------|----------|--------------| +| 9.0 - 10.0 | Critical | 24 hours | +| 7.0 - 8.9 | High | 7 days | +| 4.0 - 6.9 | Medium | 30 days | +| 0.1 - 3.9 | Low | 90 days | +| 0.0 | None | Informational | -### Performance Considerations -- Optimization strategies -- Bottleneck identification -- Monitoring approaches -- Scaling techniques +### Step 3: Context Analysis -### Security Best Practices -- Input validation -- Authentication -- Authorization -- Data protection +```markdown +## CVE Context Checklist -## Common Patterns +### Exposure Assessment +- [ ] Is the vulnerable component internet-facing? +- [ ] Is the vulnerable component in a DMZ? +- [ ] Does the component process sensitive data? +- [ ] Are there compensating controls in place? -### Pattern A -Implementation details and examples. +### Exploitability Assessment +- [ ] Is there a public exploit available? +- [ ] Is exploitation being observed in the wild? +- [ ] What privileges are required to exploit? +- [ ] Does exploit require user interaction? -### Pattern B -Implementation details and examples. +### Business Impact +- [ ] What business processes depend on affected systems? +- [ ] What is the potential data exposure? +- [ ] What are regulatory implications? +- [ ] What is the reputational risk? +``` -### Pattern C -Implementation details and examples. +### Step 4: Triage Decision Matrix -## Anti-Patterns to Avoid +| Exposure | Exploitability | Business Impact | Priority | +|----------|----------------|-----------------|----------| +| Internet | Active Exploit | High | P0 - Immediate | +| Internet | PoC Available | High | P1 - Critical | +| Internet | Theoretical | Medium | P2 - High | +| Internal | Active Exploit | High | P1 - Critical | +| Internal | PoC Available | Medium | P2 - High | +| Internal | Theoretical | Low | P3 - Medium | +| Isolated | Any | Low | P4 - Low | -### Anti-Pattern 1 -What not to do and why. +--- -### Anti-Pattern 2 -What not to do and why. +## CVSS Scoring -## Tools and Resources +### CVSS v3.1 Vector Components -### Recommended Tools -- Tool 1: Purpose -- Tool 2: Purpose -- Tool 3: Purpose +``` +CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H + │ │ │ │ │ │ │ │ + │ │ │ │ │ │ │ └── Availability Impact (H/L/N) + │ │ │ │ │ │ └────── Integrity Impact (H/L/N) + │ │ │ │ │ └────────── Confidentiality Impact (H/L/N) + │ │ │ │ └────────────── Scope (C/U) + │ │ │ └─────────────────── User Interaction (R/N) + │ │ └──────────────────────── Privileges Required (H/L/N) + │ └───────────────────────────── Attack Complexity (H/L) + └─────────────────────────────────── Attack Vector (N/A/L/P) +``` -### Further Reading -- Resource 1 -- Resource 2 -- Resource 3 +### Environmental Score Adjustments -## Conclusion +```python +def calculate_environmental_score(base_cvss: float, environment: dict) -> float: + """ + Adjust CVSS base score based on environmental factors. -Key takeaways for using this reference guide effectively. + Args: + base_cvss: Base CVSS score from NVD + environment: Dictionary with environmental modifiers + + Returns: + Adjusted CVSS score for this environment + """ + # Confidentiality Requirement (CR) + cr_modifier = { + 'high': 1.5, + 'medium': 1.0, + 'low': 0.5 + }.get(environment.get('confidentiality_requirement', 'medium')) + + # Integrity Requirement (IR) + ir_modifier = { + 'high': 1.5, + 'medium': 1.0, + 'low': 0.5 + }.get(environment.get('integrity_requirement', 'medium')) + + # Availability Requirement (AR) + ar_modifier = { + 'high': 1.5, + 'medium': 1.0, + 'low': 0.5 + }.get(environment.get('availability_requirement', 'medium')) + + # Modified Attack Vector (reduce if not internet-facing) + if not environment.get('internet_facing', True): + base_cvss = max(0, base_cvss - 1.5) + + # Compensating controls reduce score + if environment.get('waf_protected', False): + base_cvss = max(0, base_cvss - 0.5) + + if environment.get('network_segmented', False): + base_cvss = max(0, base_cvss - 0.5) + + return round(min(10.0, base_cvss), 1) +``` + +--- + +## Remediation Workflows + +### Workflow 1: Emergency Patch (P0/Critical) + +``` +Timeline: 24 hours +Stakeholders: Security, DevOps, Engineering Lead, CISO + +Hour 0-2: ASSESS +├── Confirm vulnerability affects production +├── Identify all affected systems +├── Assess active exploitation +└── Notify stakeholders + +Hour 2-8: MITIGATE +├── Apply temporary mitigations (WAF rules, network blocks) +├── Enable enhanced monitoring +├── Prepare rollback plan +└── Begin patch development/testing + +Hour 8-20: REMEDIATE +├── Test patch in staging +├── Security team validates fix +├── Change approval (emergency CAB) +└── Deploy to production (rolling) + +Hour 20-24: VERIFY +├── Confirm vulnerability resolved +├── Monitor for issues +├── Update vulnerability tracker +└── Post-incident review scheduled +``` + +### Workflow 2: Standard Patch (P1-P2) + +```python +# Remediation ticket template +REMEDIATION_TICKET = """ +## Vulnerability Remediation + +**CVE:** {cve_id} +**Severity:** {severity} +**CVSS:** {cvss_score} +**SLA:** {sla_date} + +### Affected Components +{affected_components} + +### Root Cause +{root_cause} + +### Remediation Steps +1. Update {package} from {current_version} to {fixed_version} +2. Run security regression tests +3. Deploy to staging for validation +4. Security team approval required before production + +### Testing Requirements +- [ ] Unit tests pass +- [ ] Integration tests pass +- [ ] Security scan shows vulnerability resolved +- [ ] No new vulnerabilities introduced + +### Rollback Plan +{rollback_steps} + +### Acceptance Criteria +- Vulnerability scan shows CVE resolved +- No functional regression +- Performance baseline maintained +""" +``` + +### Workflow 3: Risk Acceptance + +```markdown +## Risk Acceptance Request + +**Vulnerability:** CVE-XXXX-XXXXX +**Affected System:** [System Name] +**Requested By:** [Name] +**Date:** [Date] + +### Business Justification +[Explain why the vulnerability cannot be remediated] + +### Compensating Controls +- [ ] Control 1: [Description] +- [ ] Control 2: [Description] +- [ ] Control 3: [Description] + +### Residual Risk Assessment +- **Likelihood:** [High/Medium/Low] +- **Impact:** [High/Medium/Low] +- **Residual Risk:** [Critical/High/Medium/Low] + +### Review Schedule +- Next review date: [Date] +- Review frequency: [Monthly/Quarterly] + +### Approvals +- [ ] Security Team Lead +- [ ] Engineering Manager +- [ ] CISO +- [ ] Business Owner +``` + +--- + +## Dependency Scanning + +### Automated Scanning Pipeline + +```yaml +# .github/workflows/security-scan.yml +name: Security Scan + +on: + push: + branches: [main, develop] + pull_request: + branches: [main] + schedule: + - cron: '0 6 * * *' # Daily at 6 AM + +jobs: + dependency-scan: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Run Snyk vulnerability scan + uses: snyk/actions/node@master + env: + SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }} + with: + args: --severity-threshold=high + + - name: Run npm audit + run: npm audit --audit-level=high + + - name: Run Trivy filesystem scan + uses: aquasecurity/trivy-action@master + with: + scan-type: 'fs' + scan-ref: '.' + severity: 'CRITICAL,HIGH' + exit-code: '1' + + sast-scan: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Run Semgrep + uses: returntocorp/semgrep-action@v1 + with: + config: >- + p/security-audit + p/secrets + p/owasp-top-ten +``` + +### Manual Dependency Review + +```bash +# Node.js - Check for vulnerabilities +npm audit +npm audit --json > audit-report.json + +# Python - Check for vulnerabilities +pip-audit +safety check -r requirements.txt + +# Go - Check for vulnerabilities +govulncheck ./... + +# Container images +trivy image myapp:latest +grype myapp:latest +``` + +### Dependency Update Strategy + +| Update Type | Automation | Review Required | +|-------------|------------|-----------------| +| Security patch (same minor) | Auto-merge | No | +| Minor version | Auto-PR | Yes | +| Major version | Manual PR | Yes + Testing | +| Breaking change | Manual | Yes + Migration plan | + +--- + +## Security Incident Response + +### Incident Severity Levels + +| Level | Description | Response Time | Escalation | +|-------|-------------|---------------|------------| +| SEV-1 | Active breach, data exfiltration | Immediate | CISO, Legal, Exec | +| SEV-2 | Confirmed intrusion, no data loss | 1 hour | Security Lead, Engineering | +| SEV-3 | Suspicious activity, potential breach | 4 hours | Security Team | +| SEV-4 | Policy violation, no immediate risk | 24 hours | Security Team | + +### Incident Response Checklist + +```markdown +## Incident Response Checklist + +### 1. DETECT & IDENTIFY (0-15 min) +- [ ] Alert received and acknowledged +- [ ] Initial severity assessment +- [ ] Incident commander assigned +- [ ] Communication channel established + +### 2. CONTAIN (15-60 min) +- [ ] Affected systems identified +- [ ] Network isolation if needed +- [ ] Credentials rotated if compromised +- [ ] Preserve evidence (logs, memory dumps) + +### 3. ERADICATE (1-4 hours) +- [ ] Root cause identified +- [ ] Malware/backdoors removed +- [ ] Vulnerabilities patched +- [ ] Systems hardened + +### 4. RECOVER (4-24 hours) +- [ ] Systems restored from clean backup +- [ ] Services brought back online +- [ ] Enhanced monitoring enabled +- [ ] User access restored + +### 5. POST-INCIDENT (24-72 hours) +- [ ] Incident timeline documented +- [ ] Root cause analysis complete +- [ ] Lessons learned documented +- [ ] Preventive measures implemented +- [ ] Report to stakeholders +``` + +--- + +## Quick Reference + +### Vulnerability Response SLAs + +| Severity | Detection to Triage | Triage to Remediation | +|----------|--------------------|-----------------------| +| Critical | 4 hours | 24 hours | +| High | 24 hours | 7 days | +| Medium | 3 days | 30 days | +| Low | 7 days | 90 days | + +### Common Vulnerability Databases + +| Database | URL | Use Case | +|----------|-----|----------| +| NVD | nvd.nist.gov | CVE details, CVSS | +| MITRE CVE | cve.mitre.org | CVE registry | +| OSV | osv.dev | Open source vulns | +| GitHub Advisory | github.com/advisories | Package vulns | +| Snyk DB | snyk.io/vuln | Package vulns | + +### Remediation Priority Formula + +``` +Priority Score = (CVSS × Exposure × Business_Impact) / Compensating_Controls + +Where: +- CVSS: 0-10 (from NVD) +- Exposure: 1.0 (internal) to 2.0 (internet-facing) +- Business_Impact: 1.0 (low) to 2.0 (critical) +- Compensating_Controls: 1.0 (none) to 0.5 (multiple controls) +``` diff --git a/engineering-team/senior-secops/scripts/compliance_checker.py b/engineering-team/senior-secops/scripts/compliance_checker.py index 6a1cd31..18797ad 100755 --- a/engineering-team/senior-secops/scripts/compliance_checker.py +++ b/engineering-team/senior-secops/scripts/compliance_checker.py @@ -1,114 +1,1107 @@ #!/usr/bin/env python3 """ -Compliance Checker -Automated tool for senior secops tasks +Compliance Checker - Verify security compliance against SOC 2, PCI-DSS, HIPAA, GDPR. + +Table of Contents: + ComplianceChecker - Main class for compliance verification + __init__ - Initialize with target path and framework + check() - Run compliance checks for selected framework + check_soc2() - Check SOC 2 Type II controls + check_pci_dss() - Check PCI-DSS v4.0 requirements + check_hipaa() - Check HIPAA security rule requirements + check_gdpr() - Check GDPR data protection requirements + _check_encryption_at_rest() - Verify data encryption + _check_access_controls() - Verify access control implementation + _check_logging() - Verify audit logging + _check_secrets_management() - Verify secrets handling + _calculate_compliance_score() - Calculate overall compliance score + main() - CLI entry point + +Usage: + python compliance_checker.py /path/to/project + python compliance_checker.py /path/to/project --framework soc2 + python compliance_checker.py /path/to/project --framework pci-dss --output report.json """ import os import sys import json +import re import argparse from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple +from dataclasses import dataclass, asdict +from datetime import datetime + + +@dataclass +class ComplianceControl: + """Represents a compliance control check result.""" + control_id: str + framework: str + category: str + title: str + description: str + status: str # passed, failed, warning, not_applicable + evidence: List[str] + recommendation: str + severity: str # critical, high, medium, low + class ComplianceChecker: - """Main class for compliance checker functionality""" - - def __init__(self, target_path: str, verbose: bool = False): + """Verify security compliance against industry frameworks.""" + + FRAMEWORKS = ['soc2', 'pci-dss', 'hipaa', 'gdpr', 'all'] + + def __init__( + self, + target_path: str, + framework: str = "all", + verbose: bool = False + ): + """ + Initialize the compliance checker. + + Args: + target_path: Directory to scan + framework: Compliance framework to check (soc2, pci-dss, hipaa, gdpr, all) + verbose: Enable verbose output + """ self.target_path = Path(target_path) + self.framework = framework.lower() self.verbose = verbose - self.results = {} - - def run(self) -> Dict: - """Execute the main functionality""" - print(f"🚀 Running {self.__class__.__name__}...") - print(f"📁 Target: {self.target_path}") - - try: - self.validate_target() - self.analyze() - self.generate_report() - - print("✅ Completed successfully!") - return self.results - - except Exception as e: - print(f"❌ Error: {e}") - sys.exit(1) - - def validate_target(self): - """Validate the target path exists and is accessible""" + self.controls: List[ComplianceControl] = [] + self.files_scanned = 0 + + def check(self) -> Dict: + """ + Run compliance checks for selected framework. + + Returns: + Dict with compliance results + """ + print(f"Compliance Checker - Scanning: {self.target_path}") + print(f"Framework: {self.framework.upper()}") + print() + if not self.target_path.exists(): - raise ValueError(f"Target path does not exist: {self.target_path}") - + return {"status": "error", "message": f"Path not found: {self.target_path}"} + + start_time = datetime.now() + + # Run framework-specific checks + if self.framework in ('soc2', 'all'): + self.check_soc2() + if self.framework in ('pci-dss', 'all'): + self.check_pci_dss() + if self.framework in ('hipaa', 'all'): + self.check_hipaa() + if self.framework in ('gdpr', 'all'): + self.check_gdpr() + + end_time = datetime.now() + scan_duration = (end_time - start_time).total_seconds() + + # Calculate statistics + passed = len([c for c in self.controls if c.status == 'passed']) + failed = len([c for c in self.controls if c.status == 'failed']) + warnings = len([c for c in self.controls if c.status == 'warning']) + na = len([c for c in self.controls if c.status == 'not_applicable']) + + compliance_score = self._calculate_compliance_score() + + result = { + "status": "completed", + "target": str(self.target_path), + "framework": self.framework, + "scan_duration_seconds": round(scan_duration, 2), + "compliance_score": compliance_score, + "compliance_level": self._get_compliance_level(compliance_score), + "summary": { + "passed": passed, + "failed": failed, + "warnings": warnings, + "not_applicable": na, + "total": len(self.controls) + }, + "controls": [asdict(c) for c in self.controls] + } + + self._print_summary(result) + + return result + + def check_soc2(self): + """Check SOC 2 Type II controls.""" if self.verbose: - print(f"✓ Target validated: {self.target_path}") - - def analyze(self): - """Perform the main analysis or operation""" + print(" Checking SOC 2 Type II controls...") + + # CC1: Control Environment - Access Controls + self._check_access_controls_soc2() + + # CC2: Communication and Information + self._check_documentation() + + # CC3: Risk Assessment + self._check_risk_assessment() + + # CC6: Logical and Physical Access Controls + self._check_authentication() + + # CC7: System Operations + self._check_logging() + + # CC8: Change Management + self._check_change_management() + + def check_pci_dss(self): + """Check PCI-DSS v4.0 requirements.""" if self.verbose: - print("📊 Analyzing...") - - # Main logic here - self.results['status'] = 'success' - self.results['target'] = str(self.target_path) - self.results['findings'] = [] - - # Add analysis results + print(" Checking PCI-DSS v4.0 requirements...") + + # Requirement 3: Protect stored cardholder data + self._check_data_encryption() + + # Requirement 4: Encrypt transmission of cardholder data + self._check_transmission_encryption() + + # Requirement 6: Develop and maintain secure systems + self._check_secure_development() + + # Requirement 8: Identify users and authenticate access + self._check_strong_authentication() + + # Requirement 10: Log and monitor all access + self._check_audit_logging() + + # Requirement 11: Test security of systems regularly + self._check_security_testing() + + def check_hipaa(self): + """Check HIPAA security rule requirements.""" if self.verbose: - print(f"✓ Analysis complete: {len(self.results.get('findings', []))} findings") - - def generate_report(self): - """Generate and display the report""" - print("\n" + "="*50) - print("REPORT") - print("="*50) - print(f"Target: {self.results.get('target')}") - print(f"Status: {self.results.get('status')}") - print(f"Findings: {len(self.results.get('findings', []))}") - print("="*50 + "\n") + print(" Checking HIPAA Security Rule requirements...") + + # 164.312(a)(1): Access Control + self._check_hipaa_access_control() + + # 164.312(b): Audit Controls + self._check_hipaa_audit() + + # 164.312(c)(1): Integrity Controls + self._check_hipaa_integrity() + + # 164.312(d): Person or Entity Authentication + self._check_hipaa_authentication() + + # 164.312(e)(1): Transmission Security + self._check_hipaa_transmission() + + def check_gdpr(self): + """Check GDPR data protection requirements.""" + if self.verbose: + print(" Checking GDPR requirements...") + + # Article 25: Data protection by design + self._check_privacy_by_design() + + # Article 32: Security of processing + self._check_gdpr_security() + + # Article 33/34: Breach notification + self._check_breach_notification() + + # Article 17: Right to erasure + self._check_data_deletion() + + # Article 20: Data portability + self._check_data_export() + + def _check_access_controls_soc2(self): + """SOC 2 CC1/CC6: Check access control implementation.""" + evidence = [] + status = 'failed' + + # Look for authentication middleware + auth_patterns = [ + r'authMiddleware', + r'requireAuth', + r'isAuthenticated', + r'@login_required', + r'@authenticated', + r'passport\.authenticate', + r'jwt\.verify', + r'verifyToken' + ] + + for pattern in auth_patterns: + files = self._search_files(pattern) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + # Check for RBAC implementation + rbac_patterns = [r'role', r'permission', r'authorize', r'can\(', r'hasRole'] + for pattern in rbac_patterns: + files = self._search_files(pattern) + if files: + evidence.extend(files[:2]) + if status == 'failed': + status = 'warning' + break + + self.controls.append(ComplianceControl( + control_id='SOC2-CC6.1', + framework='SOC 2', + category='Logical Access Controls', + title='Access Control Implementation', + description='Verify authentication and authorization controls are implemented', + status=status, + evidence=evidence[:5], + recommendation='Implement authentication middleware and role-based access control (RBAC)', + severity='high' if status == 'failed' else 'low' + )) + + def _check_documentation(self): + """SOC 2 CC2: Check security documentation.""" + evidence = [] + status = 'failed' + + doc_files = [ + 'SECURITY.md', + 'docs/security.md', + 'CONTRIBUTING.md', + 'docs/security-policy.md', + '.github/SECURITY.md' + ] + + for doc in doc_files: + doc_path = self.target_path / doc + if doc_path.exists(): + evidence.append(str(doc)) + status = 'passed' if 'security' in doc.lower() else 'warning' + break + + self.controls.append(ComplianceControl( + control_id='SOC2-CC2.1', + framework='SOC 2', + category='Communication and Information', + title='Security Documentation', + description='Verify security policies and procedures are documented', + status=status, + evidence=evidence, + recommendation='Create SECURITY.md documenting security policies, incident response, and vulnerability reporting', + severity='medium' if status == 'failed' else 'low' + )) + + def _check_risk_assessment(self): + """SOC 2 CC3: Check risk assessment artifacts.""" + evidence = [] + status = 'failed' + + # Look for security scanning configuration + scan_configs = [ + '.snyk', + '.github/workflows/security.yml', + '.github/workflows/codeql.yml', + 'trivy.yaml', + '.semgrep.yml', + 'sonar-project.properties' + ] + + for config in scan_configs: + config_path = self.target_path / config + if config_path.exists(): + evidence.append(str(config)) + status = 'passed' + break + + # Check for dependabot/renovate + dep_configs = [ + '.github/dependabot.yml', + 'renovate.json', + '.github/renovate.json' + ] + + for config in dep_configs: + config_path = self.target_path / config + if config_path.exists(): + evidence.append(str(config)) + if status == 'failed': + status = 'warning' + break + + self.controls.append(ComplianceControl( + control_id='SOC2-CC3.1', + framework='SOC 2', + category='Risk Assessment', + title='Automated Security Scanning', + description='Verify automated vulnerability scanning is configured', + status=status, + evidence=evidence, + recommendation='Configure automated security scanning (Snyk, CodeQL, Trivy) and dependency updates (Dependabot)', + severity='high' if status == 'failed' else 'low' + )) + + def _check_authentication(self): + """SOC 2 CC6: Check authentication strength.""" + evidence = [] + status = 'failed' + + # Check for MFA/2FA + mfa_patterns = [r'mfa', r'2fa', r'totp', r'authenticator', r'twoFactor'] + for pattern in mfa_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:2]) + status = 'passed' + break + + # Check for password hashing + hash_patterns = [r'bcrypt', r'argon2', r'scrypt', r'pbkdf2'] + for pattern in hash_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:2]) + if status == 'failed': + status = 'warning' + break + + self.controls.append(ComplianceControl( + control_id='SOC2-CC6.2', + framework='SOC 2', + category='Authentication', + title='Strong Authentication', + description='Verify multi-factor authentication and secure password storage', + status=status, + evidence=evidence[:5], + recommendation='Implement MFA/2FA and use bcrypt/argon2 for password hashing', + severity='critical' if status == 'failed' else 'low' + )) + + def _check_logging(self): + """SOC 2 CC7: Check audit logging implementation.""" + evidence = [] + status = 'failed' + + # Check for logging configuration + log_patterns = [ + r'winston', + r'pino', + r'bunyan', + r'logging\.getLogger', + r'log\.info', + r'logger\.', + r'audit.*log' + ] + + for pattern in log_patterns: + files = self._search_files(pattern) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + # Check for structured logging + struct_patterns = [r'json.*log', r'structured.*log', r'log.*format'] + for pattern in struct_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:2]) + break + + self.controls.append(ComplianceControl( + control_id='SOC2-CC7.1', + framework='SOC 2', + category='System Operations', + title='Audit Logging', + description='Verify comprehensive audit logging is implemented', + status=status, + evidence=evidence[:5], + recommendation='Implement structured audit logging with security events (auth, access, changes)', + severity='high' if status == 'failed' else 'low' + )) + + def _check_change_management(self): + """SOC 2 CC8: Check change management controls.""" + evidence = [] + status = 'failed' + + # Check for CI/CD configuration + ci_configs = [ + '.github/workflows', + '.gitlab-ci.yml', + 'Jenkinsfile', + '.circleci/config.yml', + 'azure-pipelines.yml' + ] + + for config in ci_configs: + config_path = self.target_path / config + if config_path.exists(): + evidence.append(str(config)) + status = 'passed' + break + + # Check for branch protection indicators + branch_patterns = [r'protected.*branch', r'require.*review', r'pull.*request'] + for pattern in branch_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:2]) + break + + self.controls.append(ComplianceControl( + control_id='SOC2-CC8.1', + framework='SOC 2', + category='Change Management', + title='CI/CD and Code Review', + description='Verify automated deployment pipeline and code review process', + status=status, + evidence=evidence[:5], + recommendation='Implement CI/CD pipeline with required code reviews and branch protection', + severity='medium' if status == 'failed' else 'low' + )) + + def _check_data_encryption(self): + """PCI-DSS Req 3: Check encryption at rest.""" + evidence = [] + status = 'failed' + + encryption_patterns = [ + r'AES', + r'encrypt', + r'crypto\.createCipher', + r'Fernet', + r'KMS', + r'encryptedField' + ] + + for pattern in encryption_patterns: + files = self._search_files(pattern) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='PCI-DSS-3.5', + framework='PCI-DSS', + category='Protect Stored Data', + title='Encryption at Rest', + description='Verify sensitive data is encrypted at rest', + status=status, + evidence=evidence[:5], + recommendation='Implement AES-256 encryption for sensitive data storage using approved libraries', + severity='critical' if status == 'failed' else 'low' + )) + + def _check_transmission_encryption(self): + """PCI-DSS Req 4: Check encryption in transit.""" + evidence = [] + status = 'failed' + + tls_patterns = [ + r'https://', + r'TLS', + r'SSL', + r'secure.*cookie', + r'HSTS' + ] + + for pattern in tls_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='PCI-DSS-4.1', + framework='PCI-DSS', + category='Encrypt Transmissions', + title='TLS/HTTPS Enforcement', + description='Verify TLS 1.2+ is enforced for all transmissions', + status=status, + evidence=evidence[:5], + recommendation='Enforce HTTPS with TLS 1.2+, enable HSTS, use secure cookies', + severity='critical' if status == 'failed' else 'low' + )) + + def _check_secure_development(self): + """PCI-DSS Req 6: Check secure development practices.""" + evidence = [] + status = 'failed' + + # Check for input validation + validation_patterns = [ + r'validator', + r'sanitize', + r'escape', + r'zod', + r'yup', + r'joi' + ] + + for pattern in validation_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='PCI-DSS-6.5', + framework='PCI-DSS', + category='Secure Development', + title='Input Validation', + description='Verify input validation and sanitization is implemented', + status=status, + evidence=evidence[:5], + recommendation='Use validation libraries (Joi, Zod, validator.js) for all user input', + severity='high' if status == 'failed' else 'low' + )) + + def _check_strong_authentication(self): + """PCI-DSS Req 8: Check authentication requirements.""" + evidence = [] + status = 'failed' + + # Check for session management + session_patterns = [ + r'session.*timeout', + r'maxAge', + r'expiresIn', + r'session.*expire' + ] + + for pattern in session_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='PCI-DSS-8.6', + framework='PCI-DSS', + category='Authentication', + title='Session Management', + description='Verify session timeout and management controls', + status=status, + evidence=evidence[:5], + recommendation='Implement 15-minute session timeout, secure session tokens, and session invalidation on logout', + severity='high' if status == 'failed' else 'low' + )) + + def _check_audit_logging(self): + """PCI-DSS Req 10: Check audit logging.""" + # Reuse SOC 2 logging check logic + evidence = [] + status = 'failed' + + log_patterns = [r'audit', r'log.*event', r'security.*log'] + for pattern in log_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='PCI-DSS-10.2', + framework='PCI-DSS', + category='Logging and Monitoring', + title='Security Event Logging', + description='Verify security events are logged with sufficient detail', + status=status, + evidence=evidence[:5], + recommendation='Log all authentication events, access to cardholder data, and administrative actions', + severity='high' if status == 'failed' else 'low' + )) + + def _check_security_testing(self): + """PCI-DSS Req 11: Check security testing.""" + evidence = [] + status = 'failed' + + # Check for test configuration + test_patterns = [ + r'security.*test', + r'penetration.*test', + r'vulnerability.*scan' + ] + + for pattern in test_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + # Check for SAST/DAST configuration + sast_configs = ['.snyk', '.semgrep.yml', 'sonar-project.properties'] + for config in sast_configs: + if (self.target_path / config).exists(): + evidence.append(config) + if status == 'failed': + status = 'warning' + break + + self.controls.append(ComplianceControl( + control_id='PCI-DSS-11.3', + framework='PCI-DSS', + category='Security Testing', + title='Vulnerability Assessment', + description='Verify regular security testing is performed', + status=status, + evidence=evidence[:5], + recommendation='Configure SAST/DAST scanning and schedule quarterly penetration tests', + severity='high' if status == 'failed' else 'low' + )) + + def _check_hipaa_access_control(self): + """HIPAA 164.312(a)(1): Access Control.""" + evidence = [] + status = 'failed' + + # Check for user identification + auth_patterns = [r'user.*id', r'authentication', r'identity'] + for pattern in auth_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='HIPAA-164.312(a)(1)', + framework='HIPAA', + category='Access Control', + title='Unique User Identification', + description='Verify unique user identification for accessing PHI', + status=status, + evidence=evidence[:5], + recommendation='Implement unique user accounts with individual credentials for all PHI access', + severity='critical' if status == 'failed' else 'low' + )) + + def _check_hipaa_audit(self): + """HIPAA 164.312(b): Audit Controls.""" + evidence = [] + status = 'failed' + + audit_patterns = [r'audit.*trail', r'access.*log', r'phi.*log'] + for pattern in audit_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='HIPAA-164.312(b)', + framework='HIPAA', + category='Audit Controls', + title='PHI Access Audit Trail', + description='Verify audit trails for PHI access are maintained', + status=status, + evidence=evidence[:5], + recommendation='Implement comprehensive audit logging for all PHI access with who/what/when/where', + severity='critical' if status == 'failed' else 'low' + )) + + def _check_hipaa_integrity(self): + """HIPAA 164.312(c)(1): Integrity Controls.""" + evidence = [] + status = 'failed' + + integrity_patterns = [r'checksum', r'hash', r'signature', r'integrity'] + for pattern in integrity_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='HIPAA-164.312(c)(1)', + framework='HIPAA', + category='Integrity', + title='Data Integrity Controls', + description='Verify mechanisms to protect PHI from improper alteration', + status=status, + evidence=evidence[:5], + recommendation='Implement checksums, digital signatures, or hashing for PHI integrity verification', + severity='high' if status == 'failed' else 'low' + )) + + def _check_hipaa_authentication(self): + """HIPAA 164.312(d): Authentication.""" + evidence = [] + status = 'failed' + + auth_patterns = [r'mfa', r'two.*factor', r'biometric', r'token.*auth'] + for pattern in auth_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='HIPAA-164.312(d)', + framework='HIPAA', + category='Authentication', + title='Person Authentication', + description='Verify mechanisms to authenticate person or entity accessing PHI', + status=status, + evidence=evidence[:5], + recommendation='Implement multi-factor authentication for all PHI access', + severity='critical' if status == 'failed' else 'low' + )) + + def _check_hipaa_transmission(self): + """HIPAA 164.312(e)(1): Transmission Security.""" + evidence = [] + status = 'failed' + + transmission_patterns = [r'tls', r'ssl', r'https', r'encrypt.*transit'] + for pattern in transmission_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='HIPAA-164.312(e)(1)', + framework='HIPAA', + category='Transmission Security', + title='PHI Transmission Encryption', + description='Verify PHI is encrypted during transmission', + status=status, + evidence=evidence[:5], + recommendation='Enforce TLS 1.2+ for all PHI transmissions, implement end-to-end encryption', + severity='critical' if status == 'failed' else 'low' + )) + + def _check_privacy_by_design(self): + """GDPR Article 25: Privacy by design.""" + evidence = [] + status = 'failed' + + privacy_patterns = [ + r'data.*minimization', + r'privacy.*config', + r'consent', + r'gdpr' + ] + + for pattern in privacy_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='GDPR-25', + framework='GDPR', + category='Privacy by Design', + title='Data Minimization', + description='Verify data collection is limited to necessary purposes', + status=status, + evidence=evidence[:5], + recommendation='Implement data minimization, purpose limitation, and privacy-by-default configurations', + severity='high' if status == 'failed' else 'low' + )) + + def _check_gdpr_security(self): + """GDPR Article 32: Security of processing.""" + evidence = [] + status = 'failed' + + security_patterns = [r'encrypt', r'pseudonymization', r'anonymization'] + for pattern in security_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='GDPR-32', + framework='GDPR', + category='Security', + title='Pseudonymization and Encryption', + description='Verify appropriate security measures for personal data', + status=status, + evidence=evidence[:5], + recommendation='Implement encryption and pseudonymization for personal data processing', + severity='high' if status == 'failed' else 'low' + )) + + def _check_breach_notification(self): + """GDPR Article 33/34: Breach notification.""" + evidence = [] + status = 'failed' + + breach_patterns = [ + r'breach.*notification', + r'incident.*response', + r'security.*incident' + ] + + for pattern in breach_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + # Check for incident response documentation + incident_docs = ['SECURITY.md', 'docs/incident-response.md', '.github/SECURITY.md'] + for doc in incident_docs: + if (self.target_path / doc).exists(): + evidence.append(doc) + if status == 'failed': + status = 'warning' + break + + self.controls.append(ComplianceControl( + control_id='GDPR-33', + framework='GDPR', + category='Breach Notification', + title='Incident Response Procedure', + description='Verify breach notification procedures are documented', + status=status, + evidence=evidence[:5], + recommendation='Document incident response procedures with 72-hour notification capability', + severity='high' if status == 'failed' else 'low' + )) + + def _check_data_deletion(self): + """GDPR Article 17: Right to erasure.""" + evidence = [] + status = 'failed' + + deletion_patterns = [ + r'delete.*user', + r'erasure', + r'right.*forgotten', + r'data.*deletion', + r'gdpr.*delete' + ] + + for pattern in deletion_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='GDPR-17', + framework='GDPR', + category='Data Subject Rights', + title='Right to Erasure', + description='Verify data deletion capability is implemented', + status=status, + evidence=evidence[:5], + recommendation='Implement complete user data deletion including all backups and third-party systems', + severity='high' if status == 'failed' else 'low' + )) + + def _check_data_export(self): + """GDPR Article 20: Data portability.""" + evidence = [] + status = 'failed' + + export_patterns = [ + r'export.*data', + r'data.*portability', + r'download.*data', + r'gdpr.*export' + ] + + for pattern in export_patterns: + files = self._search_files(pattern, case_sensitive=False) + if files: + evidence.extend(files[:3]) + status = 'passed' + break + + self.controls.append(ComplianceControl( + control_id='GDPR-20', + framework='GDPR', + category='Data Subject Rights', + title='Data Portability', + description='Verify data export capability is implemented', + status=status, + evidence=evidence[:5], + recommendation='Implement data export in machine-readable format (JSON, CSV)', + severity='medium' if status == 'failed' else 'low' + )) + + def _search_files(self, pattern: str, case_sensitive: bool = True) -> List[str]: + """Search files for pattern matches.""" + matches = [] + flags = 0 if case_sensitive else re.IGNORECASE + + try: + for root, dirs, files in os.walk(self.target_path): + # Skip common non-relevant directories + dirs[:] = [d for d in dirs if d not in { + 'node_modules', '.git', '__pycache__', 'venv', '.venv', + 'dist', 'build', 'coverage', '.next' + }] + + for filename in files: + if filename.endswith(('.js', '.ts', '.py', '.go', '.java', '.md', '.yml', '.yaml', '.json')): + file_path = Path(root) / filename + try: + content = file_path.read_text(encoding='utf-8', errors='ignore') + if re.search(pattern, content, flags): + rel_path = str(file_path.relative_to(self.target_path)) + matches.append(rel_path) + self.files_scanned += 1 + except Exception: + pass + except Exception: + pass + + return matches[:10] # Limit results + + def _calculate_compliance_score(self) -> float: + """Calculate overall compliance score (0-100).""" + if not self.controls: + return 0.0 + + # Weight by severity + severity_weights = {'critical': 4.0, 'high': 3.0, 'medium': 2.0, 'low': 1.0} + status_scores = {'passed': 1.0, 'warning': 0.5, 'failed': 0.0, 'not_applicable': None} + + total_weight = 0.0 + total_score = 0.0 + + for control in self.controls: + score = status_scores.get(control.status) + if score is not None: # Skip N/A + weight = severity_weights.get(control.severity, 1.0) + total_weight += weight + total_score += score * weight + + return round((total_score / total_weight) * 100, 1) if total_weight > 0 else 0.0 + + def _get_compliance_level(self, score: float) -> str: + """Get compliance level from score.""" + if score >= 90: + return "COMPLIANT" + elif score >= 70: + return "PARTIALLY_COMPLIANT" + elif score >= 50: + return "NON_COMPLIANT" + return "CRITICAL_GAPS" + + def _print_summary(self, result: Dict): + """Print compliance summary.""" + print("\n" + "=" * 60) + print("COMPLIANCE CHECK SUMMARY") + print("=" * 60) + print(f"Target: {result['target']}") + print(f"Framework: {result['framework'].upper()}") + print(f"Scan duration: {result['scan_duration_seconds']}s") + print(f"Compliance score: {result['compliance_score']}% ({result['compliance_level']})") + print() + + summary = result['summary'] + print(f"Controls checked: {summary['total']}") + print(f" Passed: {summary['passed']}") + print(f" Failed: {summary['failed']}") + print(f" Warning: {summary['warnings']}") + print(f" N/A: {summary['not_applicable']}") + print("=" * 60) + + # Show failed controls + failed = [c for c in result['controls'] if c['status'] == 'failed'] + if failed: + print("\nFailed controls requiring remediation:") + for control in failed[:5]: + print(f"\n [{control['severity'].upper()}] {control['control_id']}") + print(f" {control['title']}") + print(f" Recommendation: {control['recommendation']}") + def main(): - """Main entry point""" + """Main entry point for CLI.""" parser = argparse.ArgumentParser( - description="Compliance Checker" + description="Check compliance against SOC 2, PCI-DSS, HIPAA, GDPR", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s /path/to/project + %(prog)s /path/to/project --framework soc2 + %(prog)s /path/to/project --framework pci-dss --output report.json + %(prog)s . --framework all --verbose + """ + ) + + parser.add_argument( + "target", + help="Directory to check for compliance" ) parser.add_argument( - 'target', - help='Target path to analyze or process' + "--framework", "-f", + choices=["soc2", "pci-dss", "hipaa", "gdpr", "all"], + default="all", + help="Compliance framework to check (default: all)" ) parser.add_argument( - '--verbose', '-v', - action='store_true', - help='Enable verbose output' + "--verbose", "-v", + action="store_true", + help="Enable verbose output" ) parser.add_argument( - '--json', - action='store_true', - help='Output results as JSON' + "--json", + action="store_true", + help="Output results as JSON" ) parser.add_argument( - '--output', '-o', - help='Output file path' + "--output", "-o", + help="Output file path" ) - + args = parser.parse_args() - - tool = ComplianceChecker( - args.target, + + checker = ComplianceChecker( + target_path=args.target, + framework=args.framework, verbose=args.verbose ) - - results = tool.run() - + + result = checker.check() + if args.json: - output = json.dumps(results, indent=2) + output = json.dumps(result, indent=2) if args.output: with open(args.output, 'w') as f: f.write(output) - print(f"Results written to {args.output}") + print(f"\nResults written to {args.output}") else: print(output) + elif args.output: + with open(args.output, 'w') as f: + json.dump(result, f, indent=2) + print(f"\nResults written to {args.output}") -if __name__ == '__main__': + # Exit with error code based on compliance level + if result.get('compliance_level') == 'CRITICAL_GAPS': + sys.exit(2) + if result.get('compliance_level') == 'NON_COMPLIANT': + sys.exit(1) + + +if __name__ == "__main__": main() diff --git a/engineering-team/senior-secops/scripts/security_scanner.py b/engineering-team/senior-secops/scripts/security_scanner.py index 66e2e63..9403734 100755 --- a/engineering-team/senior-secops/scripts/security_scanner.py +++ b/engineering-team/senior-secops/scripts/security_scanner.py @@ -1,114 +1,471 @@ #!/usr/bin/env python3 """ -Security Scanner -Automated tool for senior secops tasks +Security Scanner - Scan source code for security vulnerabilities. + +Table of Contents: + SecurityScanner - Main class for security scanning + __init__ - Initialize with target path and options + scan() - Run all security scans + scan_secrets() - Detect hardcoded secrets + scan_sql_injection() - Detect SQL injection patterns + scan_xss() - Detect XSS vulnerabilities + scan_command_injection() - Detect command injection + scan_path_traversal() - Detect path traversal + _scan_file() - Scan individual file for patterns + _calculate_severity() - Calculate finding severity + main() - CLI entry point + +Usage: + python security_scanner.py /path/to/project + python security_scanner.py /path/to/project --severity high + python security_scanner.py /path/to/project --output report.json --json """ import os import sys import json +import re import argparse from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple +from dataclasses import dataclass, asdict +from datetime import datetime + + +@dataclass +class SecurityFinding: + """Represents a security finding.""" + rule_id: str + severity: str # critical, high, medium, low, info + category: str + title: str + description: str + file_path: str + line_number: int + code_snippet: str + recommendation: str + class SecurityScanner: - """Main class for security scanner functionality""" - - def __init__(self, target_path: str, verbose: bool = False): + """Scan source code for security vulnerabilities.""" + + # File extensions to scan + SCAN_EXTENSIONS = { + '.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.go', + '.rb', '.php', '.cs', '.rs', '.swift', '.kt', + '.yml', '.yaml', '.json', '.xml', '.env', '.conf', '.config' + } + + # Directories to skip + SKIP_DIRS = { + 'node_modules', '.git', '__pycache__', '.venv', 'venv', + 'vendor', 'dist', 'build', '.next', 'coverage' + } + + # Secret patterns + SECRET_PATTERNS = [ + (r'(?i)(api[_-]?key|apikey)\s*[:=]\s*["\']?([a-zA-Z0-9_\-]{20,})["\']?', + 'API Key', 'Hardcoded API key detected'), + (r'(?i)(secret[_-]?key|secretkey)\s*[:=]\s*["\']?([a-zA-Z0-9_\-]{16,})["\']?', + 'Secret Key', 'Hardcoded secret key detected'), + (r'(?i)(password|passwd|pwd)\s*[:=]\s*["\']([^"\']{4,})["\']', + 'Password', 'Hardcoded password detected'), + (r'(?i)(aws[_-]?access[_-]?key[_-]?id)\s*[:=]\s*["\']?(AKIA[A-Z0-9]{16})["\']?', + 'AWS Access Key', 'Hardcoded AWS access key detected'), + (r'(?i)(aws[_-]?secret[_-]?access[_-]?key)\s*[:=]\s*["\']?([a-zA-Z0-9/+=]{40})["\']?', + 'AWS Secret Key', 'Hardcoded AWS secret access key detected'), + (r'ghp_[a-zA-Z0-9]{36}', + 'GitHub Token', 'GitHub personal access token detected'), + (r'sk-[a-zA-Z0-9]{48}', + 'OpenAI API Key', 'OpenAI API key detected'), + (r'-----BEGIN\s+(RSA|DSA|EC|OPENSSH)?\s*PRIVATE KEY-----', + 'Private Key', 'Private key detected in source code'), + ] + + # SQL injection patterns + SQL_INJECTION_PATTERNS = [ + (r'execute\s*\(\s*["\']?\s*SELECT.*\+.*\+', + 'Dynamic SQL query with string concatenation'), + (r'execute\s*\(\s*f["\']SELECT', + 'F-string SQL query (Python)'), + (r'cursor\.execute\s*\(\s*["\'].*%s.*%\s*\(', + 'Unsafe string formatting in SQL'), + (r'query\s*\(\s*[`"\']SELECT.*\$\{', + 'Template literal SQL injection (JavaScript)'), + (r'\.query\s*\(\s*["\'].*\+.*\+', + 'String concatenation in SQL query'), + ] + + # XSS patterns + XSS_PATTERNS = [ + (r'innerHTML\s*=\s*[^;]+(?:user|input|param|query)', + 'User input assigned to innerHTML'), + (r'document\.write\s*\([^;]*(?:user|input|param|query)', + 'User input in document.write'), + (r'\.html\s*\(\s*[^)]*(?:user|input|param|query)', + 'User input in jQuery .html()'), + (r'dangerouslySetInnerHTML', + 'React dangerouslySetInnerHTML usage'), + (r'\|safe\s*}}', + 'Django safe filter may disable escaping'), + ] + + # Command injection patterns (detection rules for finding unsafe patterns) + COMMAND_INJECTION_PATTERNS = [ + (r'subprocess\.(?:call|run|Popen)\s*\([^)]*shell\s*=\s*True', + 'Subprocess with shell=True'), + (r'exec\s*\(\s*[^)]*(?:user|input|param|request)', + 'exec() with potential user input'), + (r'eval\s*\(\s*[^)]*(?:user|input|param|request)', + 'eval() with potential user input'), + ] + + # Path traversal patterns + PATH_TRAVERSAL_PATTERNS = [ + (r'open\s*\(\s*[^)]*(?:user|input|param|request)', + 'File open with potential user input'), + (r'readFile\s*\(\s*[^)]*(?:user|input|param|req\.|query)', + 'File read with potential user input'), + (r'path\.join\s*\([^)]*(?:user|input|param|req\.|query)', + 'Path.join with user input without validation'), + ] + + def __init__( + self, + target_path: str, + severity_threshold: str = "low", + verbose: bool = False + ): + """ + Initialize the security scanner. + + Args: + target_path: Directory or file to scan + severity_threshold: Minimum severity to report (critical, high, medium, low) + verbose: Enable verbose output + """ self.target_path = Path(target_path) + self.severity_threshold = severity_threshold self.verbose = verbose - self.results = {} - - def run(self) -> Dict: - """Execute the main functionality""" - print(f"🚀 Running {self.__class__.__name__}...") - print(f"📁 Target: {self.target_path}") - - try: - self.validate_target() - self.analyze() - self.generate_report() - - print("✅ Completed successfully!") - return self.results - - except Exception as e: - print(f"❌ Error: {e}") - sys.exit(1) - - def validate_target(self): - """Validate the target path exists and is accessible""" + self.findings: List[SecurityFinding] = [] + self.files_scanned = 0 + self.severity_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3, 'info': 4} + + def scan(self) -> Dict: + """ + Run all security scans. + + Returns: + Dict with scan results and findings + """ + print(f"Security Scanner - Scanning: {self.target_path}") + print(f"Severity threshold: {self.severity_threshold}") + print() + if not self.target_path.exists(): - raise ValueError(f"Target path does not exist: {self.target_path}") - - if self.verbose: - print(f"✓ Target validated: {self.target_path}") - - def analyze(self): - """Perform the main analysis or operation""" - if self.verbose: - print("📊 Analyzing...") - - # Main logic here - self.results['status'] = 'success' - self.results['target'] = str(self.target_path) - self.results['findings'] = [] - - # Add analysis results - if self.verbose: - print(f"✓ Analysis complete: {len(self.results.get('findings', []))} findings") - - def generate_report(self): - """Generate and display the report""" - print("\n" + "="*50) - print("REPORT") - print("="*50) - print(f"Target: {self.results.get('target')}") - print(f"Status: {self.results.get('status')}") - print(f"Findings: {len(self.results.get('findings', []))}") - print("="*50 + "\n") + return {"status": "error", "message": f"Path not found: {self.target_path}"} + + start_time = datetime.now() + + # Collect files to scan + files_to_scan = self._collect_files() + print(f"Files to scan: {len(files_to_scan)}") + + # Run scans + for file_path in files_to_scan: + self._scan_file(file_path) + self.files_scanned += 1 + + # Filter by severity threshold + threshold_level = self.severity_order.get(self.severity_threshold, 3) + filtered_findings = [ + f for f in self.findings + if self.severity_order.get(f.severity, 3) <= threshold_level + ] + + end_time = datetime.now() + scan_duration = (end_time - start_time).total_seconds() + + # Group findings by severity + severity_counts = {} + for finding in filtered_findings: + severity_counts[finding.severity] = severity_counts.get(finding.severity, 0) + 1 + + result = { + "status": "completed", + "target": str(self.target_path), + "files_scanned": self.files_scanned, + "scan_duration_seconds": round(scan_duration, 2), + "total_findings": len(filtered_findings), + "severity_counts": severity_counts, + "findings": [asdict(f) for f in filtered_findings] + } + + self._print_summary(result) + + return result + + def _collect_files(self) -> List[Path]: + """Collect files to scan.""" + files = [] + + if self.target_path.is_file(): + return [self.target_path] + + for root, dirs, filenames in os.walk(self.target_path): + # Skip directories + dirs[:] = [d for d in dirs if d not in self.SKIP_DIRS] + + for filename in filenames: + file_path = Path(root) / filename + if file_path.suffix.lower() in self.SCAN_EXTENSIONS: + files.append(file_path) + + return files + + def _scan_file(self, file_path: Path): + """Scan a single file for security issues.""" + try: + content = file_path.read_text(encoding='utf-8', errors='ignore') + lines = content.split('\n') + + relative_path = str(file_path.relative_to(self.target_path) if self.target_path.is_dir() else file_path.name) + + # Scan for secrets + self._scan_patterns( + lines, relative_path, + self.SECRET_PATTERNS, + 'secrets', + 'Hardcoded Secret', + 'critical' + ) + + # Scan for SQL injection + self._scan_patterns( + lines, relative_path, + [(p[0], p[1]) for p in self.SQL_INJECTION_PATTERNS], + 'injection', + 'SQL Injection', + 'high' + ) + + # Scan for XSS + self._scan_patterns( + lines, relative_path, + [(p[0], p[1]) for p in self.XSS_PATTERNS], + 'xss', + 'Cross-Site Scripting (XSS)', + 'high' + ) + + # Scan for command injection + self._scan_patterns( + lines, relative_path, + [(p[0], p[1]) for p in self.COMMAND_INJECTION_PATTERNS], + 'injection', + 'Command Injection', + 'critical' + ) + + # Scan for path traversal + self._scan_patterns( + lines, relative_path, + [(p[0], p[1]) for p in self.PATH_TRAVERSAL_PATTERNS], + 'path-traversal', + 'Path Traversal', + 'medium' + ) + + if self.verbose: + print(f" Scanned: {relative_path}") + + except Exception as e: + if self.verbose: + print(f" Error scanning {file_path}: {e}") + + def _scan_patterns( + self, + lines: List[str], + file_path: str, + patterns: List[Tuple], + category: str, + title: str, + default_severity: str + ): + """Scan lines for patterns.""" + for line_num, line in enumerate(lines, 1): + for pattern_tuple in patterns: + pattern = pattern_tuple[0] + description = pattern_tuple[1] if len(pattern_tuple) > 1 else title + + match = re.search(pattern, line, re.IGNORECASE) + if match: + # Check for false positives (comments, test files) + if self._is_false_positive(line, file_path): + continue + + # Determine severity based on context + severity = self._calculate_severity( + default_severity, + file_path, + category + ) + + finding = SecurityFinding( + rule_id=f"{category}-{len(self.findings) + 1:04d}", + severity=severity, + category=category, + title=title, + description=description, + file_path=file_path, + line_number=line_num, + code_snippet=line.strip()[:100], + recommendation=self._get_recommendation(category) + ) + + self.findings.append(finding) + + def _is_false_positive(self, line: str, file_path: str) -> bool: + """Check if finding is likely a false positive.""" + # Skip comments + stripped = line.strip() + if stripped.startswith('#') or stripped.startswith('//') or stripped.startswith('*'): + return True + + # Skip test files for some patterns + if 'test' in file_path.lower() or 'spec' in file_path.lower(): + return True + + # Skip example/sample values + lower_line = line.lower() + if any(skip in lower_line for skip in ['example', 'sample', 'placeholder', 'xxx', 'your_']): + return True + + return False + + def _calculate_severity(self, default: str, file_path: str, category: str) -> str: + """Calculate severity based on context.""" + # Increase severity for production-related files + if any(prod in file_path.lower() for prod in ['prod', 'production', 'deploy']): + if default == 'high': + return 'critical' + if default == 'medium': + return 'high' + + # Decrease severity for config examples + if 'example' in file_path.lower() or 'sample' in file_path.lower(): + if default == 'critical': + return 'high' + if default == 'high': + return 'medium' + + return default + + def _get_recommendation(self, category: str) -> str: + """Get remediation recommendation for category.""" + recommendations = { + 'secrets': 'Remove hardcoded secrets. Use environment variables or a secrets manager (HashiCorp Vault, AWS Secrets Manager).', + 'injection': 'Use parameterized queries or prepared statements. Never concatenate user input into queries.', + 'xss': 'Always escape or sanitize user input before rendering. Use framework-provided escaping functions.', + 'path-traversal': 'Validate and sanitize file paths. Use allowlists for permitted directories.', + } + return recommendations.get(category, 'Review and remediate the security issue.') + + def _print_summary(self, result: Dict): + """Print scan summary.""" + print("\n" + "=" * 60) + print("SECURITY SCAN SUMMARY") + print("=" * 60) + print(f"Target: {result['target']}") + print(f"Files scanned: {result['files_scanned']}") + print(f"Scan duration: {result['scan_duration_seconds']}s") + print(f"Total findings: {result['total_findings']}") + print() + + if result['severity_counts']: + print("Findings by severity:") + for severity in ['critical', 'high', 'medium', 'low', 'info']: + count = result['severity_counts'].get(severity, 0) + if count > 0: + print(f" {severity.upper()}: {count}") + print("=" * 60) + + if result['total_findings'] > 0: + print("\nTop findings:") + for finding in result['findings'][:5]: + print(f"\n [{finding['severity'].upper()}] {finding['title']}") + print(f" File: {finding['file_path']}:{finding['line_number']}") + print(f" {finding['description']}") + def main(): - """Main entry point""" + """Main entry point for CLI.""" parser = argparse.ArgumentParser( - description="Security Scanner" + description="Scan source code for security vulnerabilities", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s /path/to/project + %(prog)s /path/to/project --severity high + %(prog)s /path/to/project --output report.json --json + %(prog)s /path/to/file.py --verbose + """ + ) + + parser.add_argument( + "target", + help="Directory or file to scan" ) parser.add_argument( - 'target', - help='Target path to analyze or process' + "--severity", "-s", + choices=["critical", "high", "medium", "low", "info"], + default="low", + help="Minimum severity to report (default: low)" ) parser.add_argument( - '--verbose', '-v', - action='store_true', - help='Enable verbose output' + "--verbose", "-v", + action="store_true", + help="Enable verbose output" ) parser.add_argument( - '--json', - action='store_true', - help='Output results as JSON' + "--json", + action="store_true", + help="Output results as JSON" ) parser.add_argument( - '--output', '-o', - help='Output file path' + "--output", "-o", + help="Output file path" ) - + args = parser.parse_args() - - tool = SecurityScanner( - args.target, + + scanner = SecurityScanner( + target_path=args.target, + severity_threshold=args.severity, verbose=args.verbose ) - - results = tool.run() - + + result = scanner.scan() + if args.json: - output = json.dumps(results, indent=2) + output = json.dumps(result, indent=2) if args.output: with open(args.output, 'w') as f: f.write(output) - print(f"Results written to {args.output}") + print(f"\nResults written to {args.output}") else: print(output) + elif args.output: + with open(args.output, 'w') as f: + json.dump(result, f, indent=2) + print(f"\nResults written to {args.output}") -if __name__ == '__main__': + # Exit with error code if critical/high findings + if result.get('severity_counts', {}).get('critical', 0) > 0: + sys.exit(2) + if result.get('severity_counts', {}).get('high', 0) > 0: + sys.exit(1) + + +if __name__ == "__main__": main() diff --git a/engineering-team/senior-secops/scripts/vulnerability_assessor.py b/engineering-team/senior-secops/scripts/vulnerability_assessor.py index 3aff66e..c594522 100755 --- a/engineering-team/senior-secops/scripts/vulnerability_assessor.py +++ b/engineering-team/senior-secops/scripts/vulnerability_assessor.py @@ -1,114 +1,563 @@ #!/usr/bin/env python3 """ -Vulnerability Assessor -Automated tool for senior secops tasks +Vulnerability Assessor - Scan dependencies for known CVEs and security issues. + +Table of Contents: + VulnerabilityAssessor - Main class for dependency vulnerability assessment + __init__ - Initialize with target path and options + assess() - Run complete vulnerability assessment + scan_npm() - Scan package.json for npm vulnerabilities + scan_python() - Scan requirements.txt for Python vulnerabilities + scan_go() - Scan go.mod for Go vulnerabilities + _parse_package_json() - Parse npm package.json + _parse_requirements() - Parse Python requirements.txt + _parse_go_mod() - Parse Go go.mod + _check_vulnerability() - Check package against CVE database + _calculate_risk_score() - Calculate overall risk score + main() - CLI entry point + +Usage: + python vulnerability_assessor.py /path/to/project + python vulnerability_assessor.py /path/to/project --severity high + python vulnerability_assessor.py /path/to/project --output report.json --json """ import os import sys import json +import re import argparse from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple +from dataclasses import dataclass, asdict +from datetime import datetime + + +@dataclass +class Vulnerability: + """Represents a dependency vulnerability.""" + cve_id: str + package: str + installed_version: str + fixed_version: str + severity: str # critical, high, medium, low + cvss_score: float + description: str + ecosystem: str # npm, pypi, go + recommendation: str + class VulnerabilityAssessor: - """Main class for vulnerability assessor functionality""" - - def __init__(self, target_path: str, verbose: bool = False): + """Assess project dependencies for known vulnerabilities.""" + + # Known CVE database (simplified - real implementation would query NVD/OSV) + KNOWN_CVES = { + # npm packages + 'lodash': [ + {'version_lt': '4.17.21', 'cve': 'CVE-2021-23337', 'cvss': 7.2, + 'severity': 'high', 'desc': 'Command injection in lodash', + 'fixed': '4.17.21'}, + {'version_lt': '4.17.19', 'cve': 'CVE-2020-8203', 'cvss': 7.4, + 'severity': 'high', 'desc': 'Prototype pollution in lodash', + 'fixed': '4.17.19'}, + ], + 'axios': [ + {'version_lt': '1.6.0', 'cve': 'CVE-2023-45857', 'cvss': 6.5, + 'severity': 'medium', 'desc': 'CSRF token exposure in axios', + 'fixed': '1.6.0'}, + ], + 'express': [ + {'version_lt': '4.17.3', 'cve': 'CVE-2022-24999', 'cvss': 7.5, + 'severity': 'high', 'desc': 'Open redirect in express', + 'fixed': '4.17.3'}, + ], + 'jsonwebtoken': [ + {'version_lt': '9.0.0', 'cve': 'CVE-2022-23529', 'cvss': 9.8, + 'severity': 'critical', 'desc': 'JWT algorithm confusion attack', + 'fixed': '9.0.0'}, + ], + 'minimist': [ + {'version_lt': '1.2.6', 'cve': 'CVE-2021-44906', 'cvss': 9.8, + 'severity': 'critical', 'desc': 'Prototype pollution in minimist', + 'fixed': '1.2.6'}, + ], + 'node-fetch': [ + {'version_lt': '2.6.7', 'cve': 'CVE-2022-0235', 'cvss': 8.8, + 'severity': 'high', 'desc': 'Information exposure in node-fetch', + 'fixed': '2.6.7'}, + ], + # Python packages + 'django': [ + {'version_lt': '4.2.8', 'cve': 'CVE-2023-46695', 'cvss': 7.5, + 'severity': 'high', 'desc': 'DoS via file uploads in Django', + 'fixed': '4.2.8'}, + ], + 'requests': [ + {'version_lt': '2.31.0', 'cve': 'CVE-2023-32681', 'cvss': 6.1, + 'severity': 'medium', 'desc': 'Proxy-Auth header leak in requests', + 'fixed': '2.31.0'}, + ], + 'pillow': [ + {'version_lt': '10.0.1', 'cve': 'CVE-2023-44271', 'cvss': 7.5, + 'severity': 'high', 'desc': 'DoS via crafted image in Pillow', + 'fixed': '10.0.1'}, + ], + 'cryptography': [ + {'version_lt': '41.0.4', 'cve': 'CVE-2023-38325', 'cvss': 7.5, + 'severity': 'high', 'desc': 'NULL pointer dereference in cryptography', + 'fixed': '41.0.4'}, + ], + 'pyyaml': [ + {'version_lt': '6.0.1', 'cve': 'CVE-2020-14343', 'cvss': 9.8, + 'severity': 'critical', 'desc': 'Arbitrary code execution in PyYAML', + 'fixed': '6.0.1'}, + ], + 'urllib3': [ + {'version_lt': '2.0.6', 'cve': 'CVE-2023-43804', 'cvss': 8.1, + 'severity': 'high', 'desc': 'Cookie header leak in urllib3', + 'fixed': '2.0.6'}, + ], + # Go packages + 'golang.org/x/crypto': [ + {'version_lt': 'v0.17.0', 'cve': 'CVE-2023-48795', 'cvss': 5.9, + 'severity': 'medium', 'desc': 'SSH prefix truncation attack', + 'fixed': 'v0.17.0'}, + ], + 'golang.org/x/net': [ + {'version_lt': 'v0.17.0', 'cve': 'CVE-2023-44487', 'cvss': 7.5, + 'severity': 'high', 'desc': 'HTTP/2 rapid reset attack', + 'fixed': 'v0.17.0'}, + ], + } + + SEVERITY_ORDER = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3} + + def __init__( + self, + target_path: str, + severity_threshold: str = "low", + verbose: bool = False + ): + """ + Initialize the vulnerability assessor. + + Args: + target_path: Directory to scan for dependency files + severity_threshold: Minimum severity to report + verbose: Enable verbose output + """ self.target_path = Path(target_path) + self.severity_threshold = severity_threshold self.verbose = verbose - self.results = {} - - def run(self) -> Dict: - """Execute the main functionality""" - print(f"🚀 Running {self.__class__.__name__}...") - print(f"📁 Target: {self.target_path}") - - try: - self.validate_target() - self.analyze() - self.generate_report() - - print("✅ Completed successfully!") - return self.results - - except Exception as e: - print(f"❌ Error: {e}") - sys.exit(1) - - def validate_target(self): - """Validate the target path exists and is accessible""" + self.vulnerabilities: List[Vulnerability] = [] + self.packages_scanned = 0 + self.files_scanned = 0 + + def assess(self) -> Dict: + """ + Run complete vulnerability assessment. + + Returns: + Dict with assessment results + """ + print(f"Vulnerability Assessor - Scanning: {self.target_path}") + print(f"Severity threshold: {self.severity_threshold}") + print() + if not self.target_path.exists(): - raise ValueError(f"Target path does not exist: {self.target_path}") - + return {"status": "error", "message": f"Path not found: {self.target_path}"} + + start_time = datetime.now() + + # Scan npm dependencies + package_json = self.target_path / "package.json" + if package_json.exists(): + self.scan_npm(package_json) + self.files_scanned += 1 + + # Scan Python dependencies + requirements_files = [ + "requirements.txt", + "requirements-dev.txt", + "requirements-prod.txt", + "pyproject.toml" + ] + for req_file in requirements_files: + req_path = self.target_path / req_file + if req_path.exists(): + self.scan_python(req_path) + self.files_scanned += 1 + + # Scan Go dependencies + go_mod = self.target_path / "go.mod" + if go_mod.exists(): + self.scan_go(go_mod) + self.files_scanned += 1 + + # Scan package-lock.json for transitive dependencies + package_lock = self.target_path / "package-lock.json" + if package_lock.exists(): + self.scan_npm_lock(package_lock) + self.files_scanned += 1 + + # Filter by severity + threshold_level = self.SEVERITY_ORDER.get(self.severity_threshold, 3) + filtered_vulns = [ + v for v in self.vulnerabilities + if self.SEVERITY_ORDER.get(v.severity, 3) <= threshold_level + ] + + end_time = datetime.now() + scan_duration = (end_time - start_time).total_seconds() + + # Group by severity + severity_counts = {} + for vuln in filtered_vulns: + severity_counts[vuln.severity] = severity_counts.get(vuln.severity, 0) + 1 + + # Calculate risk score + risk_score = self._calculate_risk_score(filtered_vulns) + + result = { + "status": "completed", + "target": str(self.target_path), + "files_scanned": self.files_scanned, + "packages_scanned": self.packages_scanned, + "scan_duration_seconds": round(scan_duration, 2), + "total_vulnerabilities": len(filtered_vulns), + "risk_score": risk_score, + "risk_level": self._get_risk_level(risk_score), + "severity_counts": severity_counts, + "vulnerabilities": [asdict(v) for v in filtered_vulns] + } + + self._print_summary(result) + + return result + + def scan_npm(self, package_json_path: Path): + """Scan package.json for npm vulnerabilities.""" if self.verbose: - print(f"✓ Target validated: {self.target_path}") - - def analyze(self): - """Perform the main analysis or operation""" + print(f" Scanning: {package_json_path}") + + try: + with open(package_json_path, 'r') as f: + data = json.load(f) + + deps = {} + deps.update(data.get('dependencies', {})) + deps.update(data.get('devDependencies', {})) + + for package, version_spec in deps.items(): + self.packages_scanned += 1 + version = self._normalize_version(version_spec) + self._check_vulnerability(package.lower(), version, 'npm') + + except Exception as e: + if self.verbose: + print(f" Error scanning {package_json_path}: {e}") + + def scan_npm_lock(self, package_lock_path: Path): + """Scan package-lock.json for transitive dependencies.""" if self.verbose: - print("📊 Analyzing...") - - # Main logic here - self.results['status'] = 'success' - self.results['target'] = str(self.target_path) - self.results['findings'] = [] - - # Add analysis results + print(f" Scanning: {package_lock_path}") + + try: + with open(package_lock_path, 'r') as f: + data = json.load(f) + + # Handle npm v2/v3 lockfile format + packages = data.get('packages', {}) + if not packages: + # npm v1 format + packages = data.get('dependencies', {}) + + for pkg_path, pkg_info in packages.items(): + if not pkg_path: # Skip root + continue + + # Extract package name from path + package = pkg_path.split('node_modules/')[-1] + version = pkg_info.get('version', '') + + if package and version: + self.packages_scanned += 1 + self._check_vulnerability(package.lower(), version, 'npm') + + except Exception as e: + if self.verbose: + print(f" Error scanning {package_lock_path}: {e}") + + def scan_python(self, requirements_path: Path): + """Scan requirements.txt for Python vulnerabilities.""" if self.verbose: - print(f"✓ Analysis complete: {len(self.results.get('findings', []))} findings") - - def generate_report(self): - """Generate and display the report""" - print("\n" + "="*50) - print("REPORT") - print("="*50) - print(f"Target: {self.results.get('target')}") - print(f"Status: {self.results.get('status')}") - print(f"Findings: {len(self.results.get('findings', []))}") - print("="*50 + "\n") + print(f" Scanning: {requirements_path}") + + try: + content = requirements_path.read_text() + + # Handle pyproject.toml + if requirements_path.name == 'pyproject.toml': + self._scan_pyproject(content) + return + + # Parse requirements.txt + for line in content.split('\n'): + line = line.strip() + if not line or line.startswith('#') or line.startswith('-'): + continue + + # Parse package==version or package>=version + match = re.match(r'^([a-zA-Z0-9_-]+)\s*([=<>!~]+)\s*([0-9.]+)', line) + if match: + package = match.group(1).lower() + version = match.group(3) + self.packages_scanned += 1 + self._check_vulnerability(package, version, 'pypi') + + except Exception as e: + if self.verbose: + print(f" Error scanning {requirements_path}: {e}") + + def _scan_pyproject(self, content: str): + """Parse pyproject.toml for dependencies.""" + # Simple parsing - real implementation would use toml library + in_deps = False + for line in content.split('\n'): + line = line.strip() + if '[project.dependencies]' in line or '[tool.poetry.dependencies]' in line: + in_deps = True + continue + if line.startswith('[') and in_deps: + in_deps = False + continue + if in_deps and '=' in line: + match = re.match(r'"?([a-zA-Z0-9_-]+)"?\s*[=:]\s*"?([^"]+)"?', line) + if match: + package = match.group(1).lower() + version_spec = match.group(2) + version = self._normalize_version(version_spec) + self.packages_scanned += 1 + self._check_vulnerability(package, version, 'pypi') + + def scan_go(self, go_mod_path: Path): + """Scan go.mod for Go vulnerabilities.""" + if self.verbose: + print(f" Scanning: {go_mod_path}") + + try: + content = go_mod_path.read_text() + + # Parse require blocks + in_require = False + for line in content.split('\n'): + line = line.strip() + + if line.startswith('require ('): + in_require = True + continue + if in_require and line == ')': + in_require = False + continue + + # Parse single require or block require + if line.startswith('require ') or in_require: + parts = line.replace('require ', '').split() + if len(parts) >= 2: + package = parts[0] + version = parts[1] + self.packages_scanned += 1 + self._check_vulnerability(package, version, 'go') + + except Exception as e: + if self.verbose: + print(f" Error scanning {go_mod_path}: {e}") + + def _normalize_version(self, version_spec: str) -> str: + """Extract version number from version specification.""" + # Remove prefixes like ^, ~, >=, etc. + version = re.sub(r'^[\^~>=<]+', '', version_spec) + # Remove suffixes like -alpha, -beta, etc. + version = re.split(r'[-+]', version)[0] + return version.strip() + + def _check_vulnerability(self, package: str, version: str, ecosystem: str): + """Check if package version has known vulnerabilities.""" + cves = self.KNOWN_CVES.get(package, []) + + for cve_info in cves: + if self._version_lt(version, cve_info['version_lt']): + vuln = Vulnerability( + cve_id=cve_info['cve'], + package=package, + installed_version=version, + fixed_version=cve_info['fixed'], + severity=cve_info['severity'], + cvss_score=cve_info['cvss'], + description=cve_info['desc'], + ecosystem=ecosystem, + recommendation=f"Upgrade {package} to {cve_info['fixed']} or later" + ) + # Avoid duplicates + if not any(v.cve_id == vuln.cve_id and v.package == vuln.package + for v in self.vulnerabilities): + self.vulnerabilities.append(vuln) + + def _version_lt(self, version: str, threshold: str) -> bool: + """Compare version strings (simplified).""" + try: + # Remove 'v' prefix for Go versions + v1 = version.lstrip('v') + v2 = threshold.lstrip('v') + + parts1 = [int(x) for x in re.split(r'[.\-]', v1) if x.isdigit()] + parts2 = [int(x) for x in re.split(r'[.\-]', v2) if x.isdigit()] + + # Pad shorter version + while len(parts1) < len(parts2): + parts1.append(0) + while len(parts2) < len(parts1): + parts2.append(0) + + return parts1 < parts2 + except (ValueError, AttributeError): + return False + + def _calculate_risk_score(self, vulnerabilities: List[Vulnerability]) -> float: + """Calculate overall risk score (0-100).""" + if not vulnerabilities: + return 0.0 + + # Weight by severity and CVSS + severity_weights = {'critical': 4.0, 'high': 3.0, 'medium': 2.0, 'low': 1.0} + total_weight = 0.0 + + for vuln in vulnerabilities: + weight = severity_weights.get(vuln.severity, 1.0) + total_weight += (vuln.cvss_score * weight) + + # Normalize to 0-100 + max_possible = len(vulnerabilities) * 10.0 * 4.0 + score = (total_weight / max_possible) * 100 if max_possible > 0 else 0 + + return min(100.0, round(score, 1)) + + def _get_risk_level(self, score: float) -> str: + """Get risk level from score.""" + if score >= 70: + return "CRITICAL" + elif score >= 50: + return "HIGH" + elif score >= 25: + return "MEDIUM" + elif score > 0: + return "LOW" + return "NONE" + + def _print_summary(self, result: Dict): + """Print assessment summary.""" + print("\n" + "=" * 60) + print("VULNERABILITY ASSESSMENT SUMMARY") + print("=" * 60) + print(f"Target: {result['target']}") + print(f"Files scanned: {result['files_scanned']}") + print(f"Packages scanned: {result['packages_scanned']}") + print(f"Scan duration: {result['scan_duration_seconds']}s") + print(f"Total vulnerabilities: {result['total_vulnerabilities']}") + print(f"Risk score: {result['risk_score']}/100 ({result['risk_level']})") + print() + + if result['severity_counts']: + print("Vulnerabilities by severity:") + for severity in ['critical', 'high', 'medium', 'low']: + count = result['severity_counts'].get(severity, 0) + if count > 0: + print(f" {severity.upper()}: {count}") + print("=" * 60) + + if result['total_vulnerabilities'] > 0: + print("\nTop vulnerabilities:") + # Sort by CVSS score + sorted_vulns = sorted( + result['vulnerabilities'], + key=lambda x: x['cvss_score'], + reverse=True + ) + for vuln in sorted_vulns[:5]: + print(f"\n [{vuln['severity'].upper()}] {vuln['cve_id']}") + print(f" Package: {vuln['package']}@{vuln['installed_version']}") + print(f" CVSS: {vuln['cvss_score']}") + print(f" Fix: Upgrade to {vuln['fixed_version']}") + def main(): - """Main entry point""" + """Main entry point for CLI.""" parser = argparse.ArgumentParser( - description="Vulnerability Assessor" + description="Scan dependencies for known vulnerabilities", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s /path/to/project + %(prog)s /path/to/project --severity high + %(prog)s /path/to/project --output report.json --json + %(prog)s . --verbose + """ + ) + + parser.add_argument( + "target", + help="Directory containing dependency files" ) parser.add_argument( - 'target', - help='Target path to analyze or process' + "--severity", "-s", + choices=["critical", "high", "medium", "low"], + default="low", + help="Minimum severity to report (default: low)" ) parser.add_argument( - '--verbose', '-v', - action='store_true', - help='Enable verbose output' + "--verbose", "-v", + action="store_true", + help="Enable verbose output" ) parser.add_argument( - '--json', - action='store_true', - help='Output results as JSON' + "--json", + action="store_true", + help="Output results as JSON" ) parser.add_argument( - '--output', '-o', - help='Output file path' + "--output", "-o", + help="Output file path" ) - + args = parser.parse_args() - - tool = VulnerabilityAssessor( - args.target, + + assessor = VulnerabilityAssessor( + target_path=args.target, + severity_threshold=args.severity, verbose=args.verbose ) - - results = tool.run() - + + result = assessor.assess() + if args.json: - output = json.dumps(results, indent=2) + output = json.dumps(result, indent=2) if args.output: with open(args.output, 'w') as f: f.write(output) - print(f"Results written to {args.output}") + print(f"\nResults written to {args.output}") else: print(output) + elif args.output: + with open(args.output, 'w') as f: + json.dump(result, f, indent=2) + print(f"\nResults written to {args.output}") -if __name__ == '__main__': + # Exit with error code if critical/high vulnerabilities + if result.get('severity_counts', {}).get('critical', 0) > 0: + sys.exit(2) + if result.get('severity_counts', {}).get('high', 0) > 0: + sys.exit(1) + + +if __name__ == "__main__": main()