PROBLEM: Issue #87 feedback - senior-security scored 40/100 - Placeholder reference files with template content only - Generic scripts without actual security functionality - Missing trigger phrases, no TOC, no validation workflows SOLUTION: Complete rewrite with comprehensive security content SKILL.md (210→436 lines): - Added 12 triggers: security architecture, threat modeling, STRIDE analysis, penetration testing, vulnerability assessment, secure coding, OWASP, etc. - Added TOC with 7 sections - 5 numbered workflows with validation checkpoints: 1. Threat Modeling (STRIDE methodology) 2. Security Architecture (Zero Trust, Defense-in-Depth) 3. Vulnerability Assessment (OWASP Top 10 mapping) 4. Secure Code Review (checklist with risk categories) 5. Incident Response (severity levels, response phases) - Added STRIDE per element matrix, DREAD scoring, severity matrices - Authentication pattern selection guide - Security tools reference (SAST, DAST, dependency scanning) - Cryptographic algorithm selection guide - Compliance frameworks reference (OWASP ASVS, CIS, NIST, PCI-DSS) - Security headers checklist References (rebuilt from scratch): - security-architecture-patterns.md (~615 lines): Zero Trust implementation, Defense-in-Depth layers, OAuth 2.0 + PKCE flows, JWT patterns, API security - threat-modeling-guide.md (~518 lines): STRIDE framework with element matrix, attack trees with probability calculations, DREAD scoring, DFD creation - cryptography-implementation.md (~775 lines): AES-256-GCM, ChaCha20-Poly1305, envelope encryption, RSA, Ed25519 signatures, X25519 key exchange, Argon2id password hashing, key management strategies Scripts (rebuilt with real functionality): - threat_modeler.py (~675 lines): Interactive STRIDE analysis for any system component, DREAD risk scoring, comprehensive threat database with 70+ threats, mitigation recommendations, JSON/text output - secret_scanner.py (~516 lines): Detects AWS/GCP/Azure credentials, GitHub/Slack/ Stripe tokens, private keys (RSA/EC/OpenSSH/PGP), generic API keys, database connection strings, 20+ secret patterns, CI/CD integration ready Deleted placeholder files: - references/cryptography_implementation.md (template) - references/penetration_testing_guide.md (template) - references/security_architecture_patterns.md (template) - scripts/pentest_automator.py (placeholder) - scripts/security_auditor.py (placeholder) Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
22 KiB
22 KiB
Security Architecture Patterns
Proven security architecture patterns for designing resilient systems.
Table of Contents
- Zero Trust Architecture
- Defense in Depth
- Secure Authentication Patterns
- API Security Patterns
- Data Protection Patterns
- Security Anti-Patterns
Zero Trust Architecture
Never trust, always verify. Every request authenticated and authorized regardless of network location.
Core Principles
| Principle | Implementation |
|---|---|
| Verify explicitly | Authenticate every request with identity, location, device health |
| Least privilege | Just-in-time and just-enough access (JIT/JEA) |
| Assume breach | Segment access, encrypt end-to-end, use analytics |
Implementation Components
ZERO TRUST ARCHITECTURE
┌─────────────────────────────────────────────────────────────┐
│ CONTROL PLANE │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Identity │ │ Policy │ │ Threat │ │
│ │ Provider │ │ Engine │ │ Intelligence│ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
┌─────────┴─────────┐
│ Policy Decision │
│ Point (PDP) │
└─────────┬─────────┘
│
┌─────────────────────────────┴───────────────────────────────┐
│ DATA PLANE │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ User │──── PEP ────────────▶│ Resource │ │
│ │ Device │ │ │ (App/Data) │ │
│ └──────────────┘ │ └──────────────┘ │
│ Policy Enforcement │
│ Point (PEP) │
└─────────────────────────────────────────────────────────────┘
Authentication Flow
# Zero Trust authentication middleware
import jwt
from functools import wraps
def zero_trust_auth(required_claims=None):
"""
Verify every request against identity, device, and context.
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
# 1. Verify token signature and expiration
try:
payload = jwt.decode(token, PUBLIC_KEY, algorithms=['RS256'])
except jwt.InvalidTokenError:
return {'error': 'Invalid token'}, 401
# 2. Verify device compliance
device_id = request.headers.get('X-Device-ID')
if not verify_device_compliance(device_id, payload['user_id']):
return {'error': 'Device not compliant'}, 403
# 3. Verify location/network context
client_ip = request.remote_addr
if not verify_network_context(client_ip, payload['allowed_networks']):
return {'error': 'Network context invalid'}, 403
# 4. Verify required claims
if required_claims:
for claim in required_claims:
if claim not in payload:
return {'error': f'Missing claim: {claim}'}, 403
# 5. Log access for analytics
log_access_attempt(payload, request, 'allowed')
return f(*args, **kwargs)
return decorated
return decorator
@app.route('/api/sensitive-data')
@zero_trust_auth(required_claims=['data:read', 'clearance:secret'])
def get_sensitive_data():
return fetch_data()
Network Segmentation
| Segment | Access Level | Controls |
|---|---|---|
| DMZ | Public | WAF, DDoS protection, rate limiting |
| Application | Authenticated users | mTLS, service mesh, RBAC |
| Data | Authorized services only | Encryption, audit logging, DLP |
| Management | Privileged admins | PAM, MFA, session recording |
Defense in Depth
Multiple layers of security controls so failure of one doesn't compromise the system.
Security Layers
DEFENSE IN DEPTH LAYERS
┌─────────────────────────────────────────────────────────────┐
│ Layer 1: PERIMETER │
│ - Firewall, WAF, DDoS mitigation, DNS filtering │
├─────────────────────────────────────────────────────────────┤
│ Layer 2: NETWORK │
│ - Segmentation, IDS/IPS, network monitoring, VPN │
├─────────────────────────────────────────────────────────────┤
│ Layer 3: HOST │
│ - Endpoint protection, hardening, patching, logging │
├─────────────────────────────────────────────────────────────┤
│ Layer 4: APPLICATION │
│ - Input validation, authentication, secure coding, SAST │
├─────────────────────────────────────────────────────────────┤
│ Layer 5: DATA │
│ - Encryption at rest/transit, access controls, DLP, backup │
└─────────────────────────────────────────────────────────────┘
Implementation Checklist
| Layer | Control | Priority |
|---|---|---|
| Perimeter | Web Application Firewall | Critical |
| Perimeter | Rate limiting | Critical |
| Network | Network segmentation (VLANs) | Critical |
| Network | Intrusion detection system | High |
| Host | Automated patching | Critical |
| Host | Endpoint Detection & Response | High |
| Application | Input validation | Critical |
| Application | Parameterized queries | Critical |
| Data | Encryption at rest (AES-256) | Critical |
| Data | TLS 1.3 for transit | Critical |
Fail-Safe Defaults
# Secure default configuration
class SecurityConfig:
# Authentication
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Strict'
# Headers
CONTENT_SECURITY_POLICY = "default-src 'self'; script-src 'self'"
X_FRAME_OPTIONS = 'DENY'
X_CONTENT_TYPE_OPTIONS = 'nosniff'
REFERRER_POLICY = 'strict-origin-when-cross-origin'
# Timeouts
SESSION_LIFETIME = 3600 # 1 hour
TOKEN_EXPIRY = 900 # 15 minutes
# Rate limiting
RATE_LIMIT_DEFAULT = '100/hour'
RATE_LIMIT_AUTH = '10/minute'
Secure Authentication Patterns
OAuth 2.0 + PKCE Flow
OAUTH 2.0 AUTHORIZATION CODE FLOW WITH PKCE
┌──────────┐ ┌──────────────┐
│ Client │ │ Auth │
│ (SPA) │ │ Server │
└────┬─────┘ └──────┬───────┘
│ │
│ 1. Generate code_verifier (random string) │
│ code_challenge = SHA256(code_verifier) │
│ │
│ 2. /authorize? │
│ response_type=code& │
│ client_id=xxx& │
│ code_challenge=xxx& │
│ code_challenge_method=S256 │
│──────────────────────────────────────────────▶│
│ │
│◀──────────────────────────────────────────────│
│ 3. Redirect with authorization_code │
│ │
│ 4. POST /token │
│ grant_type=authorization_code& │
│ code=xxx& │
│ code_verifier=xxx (proves possession) │
│──────────────────────────────────────────────▶│
│ │
│◀──────────────────────────────────────────────│
│ 5. { access_token, refresh_token, id_token } │
│ │
JWT Token Structure
# Secure JWT implementation
import jwt
import secrets
from datetime import datetime, timedelta
class JWTService:
def __init__(self, private_key, public_key, issuer):
self.private_key = private_key
self.public_key = public_key
self.issuer = issuer
def create_access_token(self, user_id, roles, expires_minutes=15):
"""Create short-lived access token."""
now = datetime.utcnow()
payload = {
'iss': self.issuer,
'sub': str(user_id),
'iat': now,
'exp': now + timedelta(minutes=expires_minutes),
'jti': secrets.token_hex(16), # Unique token ID
'roles': roles,
'type': 'access'
}
return jwt.encode(payload, self.private_key, algorithm='RS256')
def create_refresh_token(self, user_id, expires_days=7):
"""Create longer-lived refresh token (stored server-side)."""
now = datetime.utcnow()
jti = secrets.token_hex(32)
payload = {
'iss': self.issuer,
'sub': str(user_id),
'iat': now,
'exp': now + timedelta(days=expires_days),
'jti': jti,
'type': 'refresh'
}
# Store jti in database for revocation capability
store_refresh_token(jti, user_id, now + timedelta(days=expires_days))
return jwt.encode(payload, self.private_key, algorithm='RS256')
def verify_token(self, token, token_type='access'):
"""Verify token with all security checks."""
try:
payload = jwt.decode(
token,
self.public_key,
algorithms=['RS256'],
issuer=self.issuer
)
# Verify token type
if payload.get('type') != token_type:
raise jwt.InvalidTokenError('Invalid token type')
# For refresh tokens, check revocation
if token_type == 'refresh':
if is_token_revoked(payload['jti']):
raise jwt.InvalidTokenError('Token revoked')
return payload
except jwt.ExpiredSignatureError:
raise AuthError('Token expired')
except jwt.InvalidTokenError as e:
raise AuthError(f'Invalid token: {e}')
Multi-Factor Authentication
| Factor | Examples | Strength |
|---|---|---|
| Knowledge | Password, PIN, security questions | Low-Medium |
| Possession | TOTP app, hardware key, SMS | Medium-High |
| Inherence | Fingerprint, face, voice | High |
# TOTP implementation
import pyotp
import qrcode
class TOTPService:
def __init__(self, issuer_name):
self.issuer = issuer_name
def generate_secret(self):
"""Generate a new TOTP secret for user."""
return pyotp.random_base32()
def get_provisioning_uri(self, secret, user_email):
"""Generate QR code URI for authenticator app."""
totp = pyotp.TOTP(secret)
return totp.provisioning_uri(
name=user_email,
issuer_name=self.issuer
)
def verify_code(self, secret, code, valid_window=1):
"""Verify TOTP code with time drift tolerance."""
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=valid_window)
API Security Patterns
Input Validation
from pydantic import BaseModel, validator, constr
import re
class UserCreateRequest(BaseModel):
"""Strict input validation for user creation."""
email: constr(max_length=255)
username: constr(min_length=3, max_length=50, regex=r'^[a-zA-Z0-9_]+$')
password: constr(min_length=12, max_length=128)
@validator('email')
def validate_email(cls, v):
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_regex, v):
raise ValueError('Invalid email format')
return v.lower()
@validator('password')
def validate_password_strength(cls, v):
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain digit')
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError('Password must contain special character')
return v
Rate Limiting
from redis import Redis
from functools import wraps
import time
class RateLimiter:
"""Token bucket rate limiter with Redis backend."""
def __init__(self, redis_client):
self.redis = redis_client
def is_allowed(self, key, limit, window_seconds):
"""Check if request is within rate limit."""
pipe = self.redis.pipeline()
now = time.time()
window_start = now - window_seconds
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count current entries
pipe.zcard(key)
# Add new entry
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, window_seconds)
results = pipe.execute()
current_count = results[1]
return current_count < limit
def rate_limit(limit=100, window=3600, key_func=None):
"""Rate limiting decorator."""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
if key_func:
key = f"rate_limit:{key_func()}"
else:
key = f"rate_limit:{request.remote_addr}:{f.__name__}"
if not rate_limiter.is_allowed(key, limit, window):
return {
'error': 'Rate limit exceeded',
'retry_after': window
}, 429
return f(*args, **kwargs)
return decorated
return decorator
SQL Injection Prevention
# NEVER: String concatenation
# query = f"SELECT * FROM users WHERE id = {user_id}"
# ALWAYS: Parameterized queries
from sqlalchemy import text
def get_user_secure(user_id):
"""Safe parameterized query."""
query = text("SELECT * FROM users WHERE id = :user_id")
result = db.execute(query, {'user_id': user_id})
return result.fetchone()
# For dynamic queries, use ORM
def search_users(filters):
"""Safe dynamic query with ORM."""
query = User.query
if 'name' in filters:
# ORM handles escaping
query = query.filter(User.name.ilike(f"%{filters['name']}%"))
if 'role' in filters:
query = query.filter(User.role == filters['role'])
return query.all()
Data Protection Patterns
Encryption at Rest
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
class FieldEncryption:
"""Encrypt sensitive database fields."""
def __init__(self, master_key):
self.fernet = Fernet(master_key)
@staticmethod
def derive_key(password, salt):
"""Derive encryption key from password."""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=480000,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key
def encrypt(self, plaintext):
"""Encrypt a field value."""
if isinstance(plaintext, str):
plaintext = plaintext.encode()
return self.fernet.encrypt(plaintext).decode()
def decrypt(self, ciphertext):
"""Decrypt a field value."""
if isinstance(ciphertext, str):
ciphertext = ciphertext.encode()
return self.fernet.decrypt(ciphertext).decode()
# Usage in ORM
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(255)) # Not sensitive
_ssn = db.Column('ssn', db.String(500)) # Encrypted
@property
def ssn(self):
if self._ssn:
return field_encryption.decrypt(self._ssn)
return None
@ssn.setter
def ssn(self, value):
if value:
self._ssn = field_encryption.encrypt(value)
else:
self._ssn = None
Secret Management
| Storage Type | Use Case | Example |
|---|---|---|
| Environment variables | Container config | DATABASE_URL |
| Secret manager | Application secrets | AWS Secrets Manager, HashiCorp Vault |
| Hardware Security Module | Cryptographic keys | AWS CloudHSM |
# HashiCorp Vault integration
import hvac
class VaultClient:
def __init__(self, url, token):
self.client = hvac.Client(url=url, token=token)
def get_secret(self, path):
"""Retrieve secret from Vault."""
secret = self.client.secrets.kv.v2.read_secret_version(path=path)
return secret['data']['data']
def get_database_credentials(self, role):
"""Get dynamic database credentials."""
creds = self.client.secrets.database.generate_credentials(role)
return {
'username': creds['data']['username'],
'password': creds['data']['password'],
'ttl': creds['lease_duration']
}
Security Anti-Patterns
Anti-Pattern: Security Through Obscurity
| Bad Practice | Why It's Wrong | Correct Approach |
|---|---|---|
| Custom encryption algorithm | Untested, likely breakable | Use AES-256-GCM, ChaCha20-Poly1305 |
| Hidden admin URLs | Discovery via fuzzing | Proper authentication + authorization |
| Encoded (not encrypted) secrets | Base64 is reversible | Use proper encryption |
Anti-Pattern: Trusting Client Input
# BAD: Trusting client-provided data
@app.route('/admin')
def admin_panel():
# Client can forge this header!
if request.headers.get('X-Is-Admin') == 'true':
return render_admin()
# GOOD: Server-side verification
@app.route('/admin')
@login_required
def admin_panel():
if not current_user.has_role('admin'):
abort(403)
return render_admin()
Anti-Pattern: Hardcoded Secrets
# BAD: Hardcoded credentials
DATABASE_URL = "postgresql://admin:SuperSecret123@localhost/db"
API_KEY = "sk-1234567890abcdef"
# GOOD: Environment variables + secret management
import os
DATABASE_URL = os.environ['DATABASE_URL']
API_KEY = vault_client.get_secret('api/keys')['api_key']
Anti-Pattern: Verbose Error Messages
# BAD: Reveals internal information
except Exception as e:
return {'error': str(e), 'stack_trace': traceback.format_exc()}, 500
# GOOD: Generic message, detailed logging
except Exception as e:
logger.exception(f"Internal error: {e}")
return {'error': 'An internal error occurred', 'request_id': request_id}, 500
Security Tools Reference
| Category | Tools |
|---|---|
| SAST (Static Analysis) | Semgrep, SonarQube, Bandit (Python), ESLint security plugins |
| DAST (Dynamic Analysis) | OWASP ZAP, Burp Suite, Nikto |
| Dependency Scanning | Snyk, Dependabot, npm audit, pip-audit |
| Secret Detection | GitLeaks, TruffleHog, detect-secrets |
| Container Security | Trivy, Clair, Anchore |
| Infrastructure | Terraform Sentinel, Checkov, tfsec |