diff --git a/.codex/skills-index.json b/.codex/skills-index.json index 1ef46a1..3eea243 100644 --- a/.codex/skills-index.json +++ b/.codex/skills-index.json @@ -87,7 +87,7 @@ "name": "senior-ml-engineer", "source": "../../engineering-team/senior-ml-engineer", "category": "engineering", - "description": "World-class ML engineering skill for productionizing ML models, MLOps, and building scalable ML systems. Expertise in PyTorch, TensorFlow, model deployment, feature stores, model monitoring, and ML infrastructure. Includes LLM integration, fine-tuning, RAG systems, and agentic AI. Use when deploying ML models, building ML platforms, implementing MLOps, or integrating LLMs into production systems." + "description": "ML engineering skill for productionizing models, building MLOps pipelines, and integrating LLMs. Covers model deployment, feature stores, drift monitoring, RAG systems, and cost optimization." }, { "name": "senior-prompt-engineer", diff --git a/engineering-team/senior-ml-engineer/SKILL.md b/engineering-team/senior-ml-engineer/SKILL.md index 57f88a4..ef30296 100644 --- a/engineering-team/senior-ml-engineer/SKILL.md +++ b/engineering-team/senior-ml-engineer/SKILL.md @@ -1,226 +1,304 @@ --- name: senior-ml-engineer -description: World-class ML engineering skill for productionizing ML models, MLOps, and building scalable ML systems. Expertise in PyTorch, TensorFlow, model deployment, feature stores, model monitoring, and ML infrastructure. Includes LLM integration, fine-tuning, RAG systems, and agentic AI. Use when deploying ML models, building ML platforms, implementing MLOps, or integrating LLMs into production systems. +description: ML engineering skill for productionizing models, building MLOps pipelines, and integrating LLMs. Covers model deployment, feature stores, drift monitoring, RAG systems, and cost optimization. +triggers: + - MLOps pipeline + - model deployment + - feature store + - model monitoring + - drift detection + - RAG system + - LLM integration + - model serving + - A/B testing ML + - automated retraining --- -# Senior ML/AI Engineer +# Senior ML Engineer -World-class senior ml/ai engineer skill for production-grade AI/ML/Data systems. +Production ML engineering patterns for model deployment, MLOps infrastructure, and LLM integration. -## Quick Start +--- -### Main Capabilities +## Table of Contents -```bash -# Core Tool 1 -python scripts/model_deployment_pipeline.py --input data/ --output results/ +- [Model Deployment Workflow](#model-deployment-workflow) +- [MLOps Pipeline Setup](#mlops-pipeline-setup) +- [LLM Integration Workflow](#llm-integration-workflow) +- [RAG System Implementation](#rag-system-implementation) +- [Model Monitoring](#model-monitoring) +- [Reference Documentation](#reference-documentation) +- [Tools](#tools) -# Core Tool 2 -python scripts/rag_system_builder.py --target project/ --analyze +--- -# Core Tool 3 -python scripts/ml_monitoring_suite.py --config config.yaml --deploy +## Model Deployment Workflow + +Deploy a trained model to production with monitoring: + +1. Export model to standardized format (ONNX, TorchScript, SavedModel) +2. Package model with dependencies in Docker container +3. Deploy to staging environment +4. Run integration tests against staging +5. Deploy canary (5% traffic) to production +6. Monitor latency and error rates for 1 hour +7. Promote to full production if metrics pass +8. **Validation:** p95 latency < 100ms, error rate < 0.1% + +### Container Template + +```dockerfile +FROM python:3.11-slim + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY model/ /app/model/ +COPY src/ /app/src/ + +HEALTHCHECK CMD curl -f http://localhost:8080/health || exit 1 + +EXPOSE 8080 +CMD ["uvicorn", "src.server:app", "--host", "0.0.0.0", "--port", "8080"] ``` -## Core Expertise +### Serving Options -This skill covers world-class capabilities in: +| Option | Latency | Throughput | Use Case | +|--------|---------|------------|----------| +| FastAPI + Uvicorn | Low | Medium | REST APIs, small models | +| Triton Inference Server | Very Low | Very High | GPU inference, batching | +| TensorFlow Serving | Low | High | TensorFlow models | +| TorchServe | Low | High | PyTorch models | +| Ray Serve | Medium | High | Complex pipelines, multi-model | -- Advanced production patterns and architectures -- Scalable system design and implementation -- Performance optimization at scale -- MLOps and DataOps best practices -- Real-time processing and inference -- Distributed computing frameworks -- Model deployment and monitoring -- Security and compliance -- Cost optimization -- Team leadership and mentoring +--- -## Tech Stack +## MLOps Pipeline Setup -**Languages:** Python, SQL, R, Scala, Go -**ML Frameworks:** PyTorch, TensorFlow, Scikit-learn, XGBoost -**Data Tools:** Spark, Airflow, dbt, Kafka, Databricks -**LLM Frameworks:** LangChain, LlamaIndex, DSPy -**Deployment:** Docker, Kubernetes, AWS/GCP/Azure -**Monitoring:** MLflow, Weights & Biases, Prometheus -**Databases:** PostgreSQL, BigQuery, Snowflake, Pinecone +Establish automated training and deployment: + +1. Configure feature store (Feast, Tecton) for training data +2. Set up experiment tracking (MLflow, Weights & Biases) +3. Create training pipeline with hyperparameter logging +4. Register model in model registry with version metadata +5. Configure staging deployment triggered by registry events +6. Set up A/B testing infrastructure for model comparison +7. Enable drift monitoring with alerting +8. **Validation:** New models automatically evaluated against baseline + +### Feature Store Pattern + +```python +from feast import Entity, Feature, FeatureView, FileSource + +user = Entity(name="user_id", value_type=ValueType.INT64) + +user_features = FeatureView( + name="user_features", + entities=["user_id"], + ttl=timedelta(days=1), + features=[ + Feature(name="purchase_count_30d", dtype=ValueType.INT64), + Feature(name="avg_order_value", dtype=ValueType.FLOAT), + ], + online=True, + source=FileSource(path="data/user_features.parquet"), +) +``` + +### Retraining Triggers + +| Trigger | Detection | Action | +|---------|-----------|--------| +| Scheduled | Cron (weekly/monthly) | Full retrain | +| Performance drop | Accuracy < threshold | Immediate retrain | +| Data drift | PSI > 0.2 | Evaluate, then retrain | +| New data volume | X new samples | Incremental update | + +--- + +## LLM Integration Workflow + +Integrate LLM APIs into production applications: + +1. Create provider abstraction layer for vendor flexibility +2. Implement retry logic with exponential backoff +3. Configure fallback to secondary provider +4. Set up token counting and context truncation +5. Add response caching for repeated queries +6. Implement cost tracking per request +7. Add structured output validation with Pydantic +8. **Validation:** Response parses correctly, cost within budget + +### Provider Abstraction + +```python +from abc import ABC, abstractmethod +from tenacity import retry, stop_after_attempt, wait_exponential + +class LLMProvider(ABC): + @abstractmethod + def complete(self, prompt: str, **kwargs) -> str: + pass + +@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10)) +def call_llm_with_retry(provider: LLMProvider, prompt: str) -> str: + return provider.complete(prompt) +``` + +### Cost Management + +| Provider | Input Cost | Output Cost | +|----------|------------|-------------| +| GPT-4 | $0.03/1K | $0.06/1K | +| GPT-3.5 | $0.0005/1K | $0.0015/1K | +| Claude 3 Opus | $0.015/1K | $0.075/1K | +| Claude 3 Haiku | $0.00025/1K | $0.00125/1K | + +--- + +## RAG System Implementation + +Build retrieval-augmented generation pipeline: + +1. Choose vector database (Pinecone, Qdrant, Weaviate) +2. Select embedding model based on quality/cost tradeoff +3. Implement document chunking strategy +4. Create ingestion pipeline with metadata extraction +5. Build retrieval with query embedding +6. Add reranking for relevance improvement +7. Format context and send to LLM +8. **Validation:** Response references retrieved context, no hallucinations + +### Vector Database Selection + +| Database | Hosting | Scale | Latency | Best For | +|----------|---------|-------|---------|----------| +| Pinecone | Managed | High | Low | Production, managed | +| Qdrant | Both | High | Very Low | Performance-critical | +| Weaviate | Both | High | Low | Hybrid search | +| Chroma | Self-hosted | Medium | Low | Prototyping | +| pgvector | Self-hosted | Medium | Medium | Existing Postgres | + +### Chunking Strategies + +| Strategy | Chunk Size | Overlap | Best For | +|----------|------------|---------|----------| +| Fixed | 500-1000 tokens | 50-100 | General text | +| Sentence | 3-5 sentences | 1 sentence | Structured text | +| Semantic | Variable | Based on meaning | Research papers | +| Recursive | Hierarchical | Parent-child | Long documents | + +--- + +## Model Monitoring + +Monitor production models for drift and degradation: + +1. Set up latency tracking (p50, p95, p99) +2. Configure error rate alerting +3. Implement input data drift detection +4. Track prediction distribution shifts +5. Log ground truth when available +6. Compare model versions with A/B metrics +7. Set up automated retraining triggers +8. **Validation:** Alerts fire before user-visible degradation + +### Drift Detection + +```python +from scipy.stats import ks_2samp + +def detect_drift(reference, current, threshold=0.05): + statistic, p_value = ks_2samp(reference, current) + return { + "drift_detected": p_value < threshold, + "ks_statistic": statistic, + "p_value": p_value + } +``` + +### Alert Thresholds + +| Metric | Warning | Critical | +|--------|---------|----------| +| p95 latency | > 100ms | > 200ms | +| Error rate | > 0.1% | > 1% | +| PSI (drift) | > 0.1 | > 0.2 | +| Accuracy drop | > 2% | > 5% | + +--- ## Reference Documentation -### 1. Mlops Production Patterns +### MLOps Production Patterns -Comprehensive guide available in `references/mlops_production_patterns.md` covering: +`references/mlops_production_patterns.md` contains: -- Advanced patterns and best practices -- Production implementation strategies -- Performance optimization techniques -- Scalability considerations -- Security and compliance -- Real-world case studies +- Model deployment pipeline with Kubernetes manifests +- Feature store architecture with Feast examples +- Model monitoring with drift detection code +- A/B testing infrastructure with traffic splitting +- Automated retraining pipeline with MLflow -### 2. Llm Integration Guide +### LLM Integration Guide -Complete workflow documentation in `references/llm_integration_guide.md` including: +`references/llm_integration_guide.md` contains: -- Step-by-step processes -- Architecture design patterns -- Tool integration guides -- Performance tuning strategies -- Troubleshooting procedures +- Provider abstraction layer pattern +- Retry and fallback strategies with tenacity +- Prompt engineering templates (few-shot, CoT) +- Token optimization with tiktoken +- Cost calculation and tracking -### 3. Rag System Architecture +### RAG System Architecture -Technical reference guide in `references/rag_system_architecture.md` with: +`references/rag_system_architecture.md` contains: -- System design principles -- Implementation examples -- Configuration best practices -- Deployment strategies -- Monitoring and observability +- RAG pipeline implementation with code +- Vector database comparison and integration +- Chunking strategies (fixed, semantic, recursive) +- Embedding model selection guide +- Hybrid search and reranking patterns -## Production Patterns +--- -### Pattern 1: Scalable Data Processing +## Tools -Enterprise-scale data processing with distributed computing: - -- Horizontal scaling architecture -- Fault-tolerant design -- Real-time and batch processing -- Data quality validation -- Performance monitoring - -### Pattern 2: ML Model Deployment - -Production ML system with high availability: - -- Model serving with low latency -- A/B testing infrastructure -- Feature store integration -- Model monitoring and drift detection -- Automated retraining pipelines - -### Pattern 3: Real-Time Inference - -High-throughput inference system: - -- Batching and caching strategies -- Load balancing -- Auto-scaling -- Latency optimization -- Cost optimization - -## Best Practices - -### Development - -- Test-driven development -- Code reviews and pair programming -- Documentation as code -- Version control everything -- Continuous integration - -### Production - -- Monitor everything critical -- Automate deployments -- Feature flags for releases -- Canary deployments -- Comprehensive logging - -### Team Leadership - -- Mentor junior engineers -- Drive technical decisions -- Establish coding standards -- Foster learning culture -- Cross-functional collaboration - -## Performance Targets - -**Latency:** -- P50: < 50ms -- P95: < 100ms -- P99: < 200ms - -**Throughput:** -- Requests/second: > 1000 -- Concurrent users: > 10,000 - -**Availability:** -- Uptime: 99.9% -- Error rate: < 0.1% - -## Security & Compliance - -- Authentication & authorization -- Data encryption (at rest & in transit) -- PII handling and anonymization -- GDPR/CCPA compliance -- Regular security audits -- Vulnerability management - -## Common Commands +### Model Deployment Pipeline ```bash -# Development -python -m pytest tests/ -v --cov -python -m black src/ -python -m pylint src/ - -# Training -python scripts/train.py --config prod.yaml -python scripts/evaluate.py --model best.pth - -# Deployment -docker build -t service:v1 . -kubectl apply -f k8s/ -helm upgrade service ./charts/ - -# Monitoring -kubectl logs -f deployment/service -python scripts/health_check.py +python scripts/model_deployment_pipeline.py --model model.pkl --target staging ``` -## Resources +Generates deployment artifacts: Dockerfile, Kubernetes manifests, health checks. -- Advanced Patterns: `references/mlops_production_patterns.md` -- Implementation Guide: `references/llm_integration_guide.md` -- Technical Reference: `references/rag_system_architecture.md` -- Automation Scripts: `scripts/` directory +### RAG System Builder -## Senior-Level Responsibilities +```bash +python scripts/rag_system_builder.py --config rag_config.yaml --analyze +``` -As a world-class senior professional: +Scaffolds RAG pipeline with vector store integration and retrieval logic. -1. **Technical Leadership** - - Drive architectural decisions - - Mentor team members - - Establish best practices - - Ensure code quality +### ML Monitoring Suite -2. **Strategic Thinking** - - Align with business goals - - Evaluate trade-offs - - Plan for scale - - Manage technical debt +```bash +python scripts/ml_monitoring_suite.py --config monitoring.yaml --deploy +``` -3. **Collaboration** - - Work across teams - - Communicate effectively - - Build consensus - - Share knowledge +Sets up drift detection, alerting, and performance dashboards. -4. **Innovation** - - Stay current with research - - Experiment with new approaches - - Contribute to community - - Drive continuous improvement +--- -5. **Production Excellence** - - Ensure high availability - - Monitor proactively - - Optimize performance - - Respond to incidents +## Tech Stack + +| Category | Tools | +|----------|-------| +| ML Frameworks | PyTorch, TensorFlow, Scikit-learn, XGBoost | +| LLM Frameworks | LangChain, LlamaIndex, DSPy | +| MLOps | MLflow, Weights & Biases, Kubeflow | +| Data | Spark, Airflow, dbt, Kafka | +| Deployment | Docker, Kubernetes, Triton | +| Databases | PostgreSQL, BigQuery, Pinecone, Redis | diff --git a/engineering-team/senior-ml-engineer/references/llm_integration_guide.md b/engineering-team/senior-ml-engineer/references/llm_integration_guide.md index 723fee8..6e715ab 100644 --- a/engineering-team/senior-ml-engineer/references/llm_integration_guide.md +++ b/engineering-team/senior-ml-engineer/references/llm_integration_guide.md @@ -1,80 +1,317 @@ -# Llm Integration Guide +# LLM Integration Guide -## Overview +Production patterns for integrating Large Language Models into applications. -World-class llm integration guide for senior ml/ai engineer. +--- -## Core Principles +## Table of Contents -### Production-First Design +- [API Integration Patterns](#api-integration-patterns) +- [Prompt Engineering](#prompt-engineering) +- [Token Optimization](#token-optimization) +- [Cost Management](#cost-management) +- [Error Handling](#error-handling) -Always design with production in mind: -- Scalability: Handle 10x current load -- Reliability: 99.9% uptime target -- Maintainability: Clear, documented code -- Observability: Monitor everything +--- -### Performance by Design +## API Integration Patterns -Optimize from the start: -- Efficient algorithms -- Resource awareness -- Strategic caching -- Batch processing +### Provider Abstraction Layer -### Security & Privacy +```python +from abc import ABC, abstractmethod +from typing import List, Dict, Any -Build security in: -- Input validation -- Data encryption -- Access control -- Audit logging +class LLMProvider(ABC): + """Abstract base class for LLM providers.""" -## Advanced Patterns + @abstractmethod + def complete(self, prompt: str, **kwargs) -> str: + pass -### Pattern 1: Distributed Processing + @abstractmethod + def chat(self, messages: List[Dict], **kwargs) -> str: + pass -Enterprise-scale data processing with fault tolerance. +class OpenAIProvider(LLMProvider): + def __init__(self, api_key: str, model: str = "gpt-4"): + self.client = OpenAI(api_key=api_key) + self.model = model -### Pattern 2: Real-Time Systems + def complete(self, prompt: str, **kwargs) -> str: + response = self.client.completions.create( + model=self.model, + prompt=prompt, + **kwargs + ) + return response.choices[0].text -Low-latency, high-throughput systems. +class AnthropicProvider(LLMProvider): + def __init__(self, api_key: str, model: str = "claude-3-opus"): + self.client = Anthropic(api_key=api_key) + self.model = model -### Pattern 3: ML at Scale + def chat(self, messages: List[Dict], **kwargs) -> str: + response = self.client.messages.create( + model=self.model, + messages=messages, + **kwargs + ) + return response.content[0].text +``` -Production ML with monitoring and automation. +### Retry and Fallback Strategy -## Best Practices +```python +import time +from tenacity import retry, stop_after_attempt, wait_exponential -### Code Quality -- Comprehensive testing -- Clear documentation -- Code reviews -- Type hints +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10) +) +def call_llm_with_retry(provider: LLMProvider, prompt: str) -> str: + """Call LLM with exponential backoff retry.""" + return provider.complete(prompt) -### Performance -- Profile before optimizing -- Monitor continuously -- Cache strategically -- Batch operations +def call_with_fallback( + primary: LLMProvider, + fallback: LLMProvider, + prompt: str +) -> str: + """Try primary provider, fall back on failure.""" + try: + return call_llm_with_retry(primary, prompt) + except Exception as e: + logger.warning(f"Primary provider failed: {e}, using fallback") + return call_llm_with_retry(fallback, prompt) +``` -### Reliability -- Design for failure -- Implement retries -- Use circuit breakers -- Monitor health +--- -## Tools & Technologies +## Prompt Engineering -Essential tools for this domain: -- Development frameworks -- Testing libraries -- Deployment platforms -- Monitoring solutions +### Prompt Templates -## Further Reading +| Pattern | Use Case | Structure | +|---------|----------|-----------| +| Zero-shot | Simple tasks | Task description + input | +| Few-shot | Complex tasks | Examples + task + input | +| Chain-of-thought | Reasoning | "Think step by step" + task | +| Role-based | Specialized output | System role + task | -- Research papers -- Industry blogs -- Conference talks -- Open source projects +### Few-Shot Template + +```python +FEW_SHOT_TEMPLATE = """ +You are a sentiment classifier. Classify the sentiment as positive, negative, or neutral. + +Examples: +Input: "This product is amazing, I love it!" +Output: positive + +Input: "Terrible experience, waste of money." +Output: negative + +Input: "The product arrived on time." +Output: neutral + +Now classify: +Input: "{user_input}" +Output:""" + +def classify_sentiment(text: str, provider: LLMProvider) -> str: + prompt = FEW_SHOT_TEMPLATE.format(user_input=text) + response = provider.complete(prompt, max_tokens=10, temperature=0) + return response.strip().lower() +``` + +### System Prompts for Consistency + +```python +SYSTEM_PROMPT = """You are a helpful assistant that answers questions about our product. + +Guidelines: +- Be concise and direct +- Use bullet points for lists +- If unsure, say "I don't have that information" +- Never make up information +- Keep responses under 200 words + +Product context: +{product_context} +""" + +def create_chat_messages(user_query: str, context: str) -> List[Dict]: + return [ + {"role": "system", "content": SYSTEM_PROMPT.format(product_context=context)}, + {"role": "user", "content": user_query} + ] +``` + +--- + +## Token Optimization + +### Token Counting + +```python +import tiktoken + +def count_tokens(text: str, model: str = "gpt-4") -> int: + """Count tokens for a given text and model.""" + encoding = tiktoken.encoding_for_model(model) + return len(encoding.encode(text)) + +def truncate_to_token_limit(text: str, max_tokens: int, model: str = "gpt-4") -> str: + """Truncate text to fit within token limit.""" + encoding = tiktoken.encoding_for_model(model) + tokens = encoding.encode(text) + + if len(tokens) <= max_tokens: + return text + + return encoding.decode(tokens[:max_tokens]) +``` + +### Context Window Management + +| Model | Context Window | Effective Limit | +|-------|----------------|-----------------| +| GPT-4 | 8,192 | ~6,000 (leave room for response) | +| GPT-4-32k | 32,768 | ~28,000 | +| Claude 3 | 200,000 | ~180,000 | +| Llama 3 | 8,192 | ~6,000 | + +### Chunking Strategy + +```python +def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]: + """Split text into overlapping chunks.""" + chunks = [] + start = 0 + + while start < len(text): + end = start + chunk_size + chunk = text[start:end] + chunks.append(chunk) + start = end - overlap + + return chunks +``` + +--- + +## Cost Management + +### Cost Calculation + +| Provider | Input Cost | Output Cost | Example (1K tokens) | +|----------|------------|-------------|---------------------| +| GPT-4 | $0.03/1K | $0.06/1K | $0.09 | +| GPT-3.5 | $0.0005/1K | $0.0015/1K | $0.002 | +| Claude 3 Opus | $0.015/1K | $0.075/1K | $0.09 | +| Claude 3 Haiku | $0.00025/1K | $0.00125/1K | $0.0015 | + +### Cost Tracking + +```python +from dataclasses import dataclass +from typing import Optional + +@dataclass +class LLMUsage: + input_tokens: int + output_tokens: int + model: str + cost: float + +def calculate_cost( + input_tokens: int, + output_tokens: int, + model: str +) -> float: + """Calculate cost based on token usage.""" + PRICING = { + "gpt-4": {"input": 0.03, "output": 0.06}, + "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015}, + "claude-3-opus": {"input": 0.015, "output": 0.075}, + } + + prices = PRICING.get(model, {"input": 0.01, "output": 0.03}) + + input_cost = (input_tokens / 1000) * prices["input"] + output_cost = (output_tokens / 1000) * prices["output"] + + return input_cost + output_cost +``` + +### Cost Optimization Strategies + +1. **Use smaller models for simple tasks** - GPT-3.5 for classification, GPT-4 for reasoning +2. **Cache common responses** - Store results for repeated queries +3. **Batch requests** - Combine multiple items in single prompt +4. **Truncate context** - Only include relevant information +5. **Set max_tokens limit** - Prevent runaway responses + +--- + +## Error Handling + +### Common Error Types + +| Error | Cause | Handling | +|-------|-------|----------| +| RateLimitError | Too many requests | Exponential backoff | +| InvalidRequestError | Bad input | Validate before sending | +| AuthenticationError | Invalid API key | Check credentials | +| ServiceUnavailable | Provider down | Fallback to alternative | +| ContextLengthExceeded | Input too long | Truncate or chunk | + +### Error Handling Pattern + +```python +from openai import RateLimitError, APIError + +def safe_llm_call(provider: LLMProvider, prompt: str, max_retries: int = 3) -> str: + """Safely call LLM with comprehensive error handling.""" + for attempt in range(max_retries): + try: + return provider.complete(prompt) + + except RateLimitError: + wait_time = 2 ** attempt + logger.warning(f"Rate limited, waiting {wait_time}s") + time.sleep(wait_time) + + except APIError as e: + if e.status_code >= 500: + logger.warning(f"Server error: {e}, retrying...") + time.sleep(1) + else: + raise + + raise Exception(f"Failed after {max_retries} attempts") +``` + +### Response Validation + +```python +import json +from pydantic import BaseModel, ValidationError + +class StructuredResponse(BaseModel): + answer: str + confidence: float + sources: List[str] + +def parse_structured_response(response: str) -> StructuredResponse: + """Parse and validate LLM JSON response.""" + try: + data = json.loads(response) + return StructuredResponse(**data) + except json.JSONDecodeError: + raise ValueError("Response is not valid JSON") + except ValidationError as e: + raise ValueError(f"Response validation failed: {e}") +``` diff --git a/engineering-team/senior-ml-engineer/references/mlops_production_patterns.md b/engineering-team/senior-ml-engineer/references/mlops_production_patterns.md index a7eb2a8..2ba220f 100644 --- a/engineering-team/senior-ml-engineer/references/mlops_production_patterns.md +++ b/engineering-team/senior-ml-engineer/references/mlops_production_patterns.md @@ -1,80 +1,265 @@ -# Mlops Production Patterns +# MLOps Production Patterns -## Overview +Production ML infrastructure patterns for model deployment, monitoring, and lifecycle management. -World-class mlops production patterns for senior ml/ai engineer. +--- -## Core Principles +## Table of Contents -### Production-First Design +- [Model Deployment Pipeline](#model-deployment-pipeline) +- [Feature Store Architecture](#feature-store-architecture) +- [Model Monitoring](#model-monitoring) +- [A/B Testing Infrastructure](#ab-testing-infrastructure) +- [Automated Retraining](#automated-retraining) -Always design with production in mind: -- Scalability: Handle 10x current load -- Reliability: 99.9% uptime target -- Maintainability: Clear, documented code -- Observability: Monitor everything +--- -### Performance by Design +## Model Deployment Pipeline -Optimize from the start: -- Efficient algorithms -- Resource awareness -- Strategic caching -- Batch processing +### Deployment Workflow -### Security & Privacy +1. Export trained model to standardized format (ONNX, TorchScript, SavedModel) +2. Package model with dependencies in Docker container +3. Deploy to staging environment +4. Run integration tests against staging +5. Deploy canary (5% traffic) to production +6. Monitor latency and error rates for 1 hour +7. Promote to full production if metrics pass +8. **Validation:** p95 latency < 100ms, error rate < 0.1% -Build security in: -- Input validation -- Data encryption -- Access control -- Audit logging +### Container Structure -## Advanced Patterns +```dockerfile +FROM python:3.11-slim -### Pattern 1: Distributed Processing +# Install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt -Enterprise-scale data processing with fault tolerance. +# Copy model artifacts +COPY model/ /app/model/ +COPY src/ /app/src/ -### Pattern 2: Real-Time Systems +# Health check endpoint +HEALTHCHECK CMD curl -f http://localhost:8080/health || exit 1 -Low-latency, high-throughput systems. +EXPOSE 8080 +CMD ["uvicorn", "src.server:app", "--host", "0.0.0.0", "--port", "8080"] +``` -### Pattern 3: ML at Scale +### Model Serving Options -Production ML with monitoring and automation. +| Option | Latency | Throughput | Use Case | +|--------|---------|------------|----------| +| FastAPI + Uvicorn | Low | Medium | REST APIs, small models | +| Triton Inference Server | Very Low | Very High | GPU inference, batching | +| TensorFlow Serving | Low | High | TensorFlow models | +| TorchServe | Low | High | PyTorch models | +| Ray Serve | Medium | High | Complex pipelines, multi-model | -## Best Practices +### Kubernetes Deployment -### Code Quality -- Comprehensive testing -- Clear documentation -- Code reviews -- Type hints +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: model-serving +spec: + replicas: 3 + selector: + matchLabels: + app: model-serving + template: + spec: + containers: + - name: model + image: model:v1.0.0 + resources: + requests: + memory: "2Gi" + cpu: "1" + limits: + memory: "4Gi" + cpu: "2" + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 5 +``` -### Performance -- Profile before optimizing -- Monitor continuously -- Cache strategically -- Batch operations +--- -### Reliability -- Design for failure -- Implement retries -- Use circuit breakers -- Monitor health +## Feature Store Architecture -## Tools & Technologies +### Feature Store Components -Essential tools for this domain: -- Development frameworks -- Testing libraries -- Deployment platforms -- Monitoring solutions +| Component | Purpose | Tools | +|-----------|---------|-------| +| Offline Store | Training data, batch features | BigQuery, Snowflake, S3 | +| Online Store | Low-latency serving | Redis, DynamoDB, Feast | +| Feature Registry | Metadata, lineage | Feast, Tecton, Hopsworks | +| Transformation | Feature engineering | Spark, Flink, dbt | -## Further Reading +### Feature Pipeline Workflow -- Research papers -- Industry blogs -- Conference talks -- Open source projects +1. Define feature schema in registry +2. Implement transformation logic (SQL or Python) +3. Backfill historical features to offline store +4. Schedule incremental updates +5. Materialize to online store for serving +6. Monitor feature freshness and quality +7. **Validation:** Feature values within expected ranges, no nulls in required fields + +### Feature Definition Example + +```python +from feast import Entity, Feature, FeatureView, FileSource + +user = Entity(name="user_id", value_type=ValueType.INT64) + +user_features = FeatureView( + name="user_features", + entities=["user_id"], + ttl=timedelta(days=1), + features=[ + Feature(name="purchase_count_30d", dtype=ValueType.INT64), + Feature(name="avg_order_value", dtype=ValueType.FLOAT), + Feature(name="days_since_last_purchase", dtype=ValueType.INT64), + ], + online=True, + source=FileSource(path="data/user_features.parquet"), +) +``` + +--- + +## Model Monitoring + +### Monitoring Dimensions + +| Dimension | Metrics | Alert Threshold | +|-----------|---------|-----------------| +| Latency | p50, p95, p99 | p95 > 100ms | +| Throughput | requests/sec | < 80% baseline | +| Errors | error rate, 5xx count | > 0.1% | +| Data Drift | PSI, KS statistic | PSI > 0.2 | +| Model Drift | accuracy, AUC decay | > 5% drop | + +### Data Drift Detection + +```python +from scipy.stats import ks_2samp +import numpy as np + +def detect_drift(reference: np.array, current: np.array, threshold: float = 0.05): + """Detect distribution drift using Kolmogorov-Smirnov test.""" + statistic, p_value = ks_2samp(reference, current) + + drift_detected = p_value < threshold + + return { + "drift_detected": drift_detected, + "ks_statistic": statistic, + "p_value": p_value, + "threshold": threshold + } +``` + +### Monitoring Dashboard Metrics + +**Infrastructure:** +- Request latency (p50, p95, p99) +- Requests per second +- Error rate by type +- CPU/memory utilization +- GPU utilization (if applicable) + +**Model Performance:** +- Prediction distribution +- Feature value distributions +- Model output confidence +- Ground truth vs predictions (when available) + +--- + +## A/B Testing Infrastructure + +### Experiment Workflow + +1. Define experiment hypothesis and success metrics +2. Calculate required sample size for statistical power +3. Configure traffic split (control vs treatment) +4. Deploy treatment model alongside control +5. Route traffic based on user/session hash +6. Collect metrics for both variants +7. Run statistical significance test +8. **Validation:** p-value < 0.05, minimum sample size reached + +### Traffic Splitting + +```python +import hashlib + +def get_variant(user_id: str, experiment: str, control_pct: float = 0.5) -> str: + """Deterministic traffic splitting based on user ID.""" + hash_input = f"{user_id}:{experiment}" + hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16) + bucket = (hash_value % 100) / 100.0 + + return "control" if bucket < control_pct else "treatment" +``` + +### Metrics Collection + +| Metric Type | Examples | Collection Method | +|-------------|----------|-------------------| +| Primary | Conversion rate, revenue | Event logging | +| Secondary | Latency, engagement | Request logs | +| Guardrail | Error rate, crashes | Monitoring system | + +--- + +## Automated Retraining + +### Retraining Triggers + +| Trigger | Detection Method | Action | +|---------|------------------|--------| +| Scheduled | Cron (weekly/monthly) | Full retrain | +| Performance drop | Accuracy < threshold | Immediate retrain | +| Data drift | PSI > 0.2 | Evaluate, then retrain | +| New data volume | X new samples | Incremental update | + +### Retraining Pipeline + +1. Trigger detection (schedule, drift, performance) +2. Fetch latest training data from feature store +3. Run training job with hyperparameter config +4. Evaluate model on holdout set +5. Compare against production model +6. If improved: register new model version +7. Deploy to staging for validation +8. Promote to production via canary +9. **Validation:** New model outperforms baseline on key metrics + +### MLflow Model Registry Integration + +```python +import mlflow + +def register_model(model, metrics: dict, model_name: str): + """Register trained model with MLflow.""" + with mlflow.start_run(): + # Log metrics + for name, value in metrics.items(): + mlflow.log_metric(name, value) + + # Log model + mlflow.sklearn.log_model(model, "model") + + # Register in model registry + model_uri = f"runs:/{mlflow.active_run().info.run_id}/model" + mlflow.register_model(model_uri, model_name) +``` diff --git a/engineering-team/senior-ml-engineer/references/rag_system_architecture.md b/engineering-team/senior-ml-engineer/references/rag_system_architecture.md index fe26280..f1e9fe5 100644 --- a/engineering-team/senior-ml-engineer/references/rag_system_architecture.md +++ b/engineering-team/senior-ml-engineer/references/rag_system_architecture.md @@ -1,80 +1,371 @@ -# Rag System Architecture +# RAG System Architecture -## Overview +Retrieval-Augmented Generation patterns for production applications. -World-class rag system architecture for senior ml/ai engineer. +--- -## Core Principles +## Table of Contents -### Production-First Design +- [RAG Pipeline Architecture](#rag-pipeline-architecture) +- [Vector Database Selection](#vector-database-selection) +- [Chunking Strategies](#chunking-strategies) +- [Embedding Models](#embedding-models) +- [Retrieval Optimization](#retrieval-optimization) -Always design with production in mind: -- Scalability: Handle 10x current load -- Reliability: 99.9% uptime target -- Maintainability: Clear, documented code -- Observability: Monitor everything +--- -### Performance by Design +## RAG Pipeline Architecture -Optimize from the start: -- Efficient algorithms -- Resource awareness -- Strategic caching -- Batch processing +### Basic RAG Flow -### Security & Privacy +1. Receive user query +2. Generate query embedding +3. Search vector database for relevant chunks +4. Rerank retrieved chunks by relevance +5. Format context with retrieved chunks +6. Send prompt to LLM with context +7. Return generated response +8. **Validation:** Response references retrieved context, no hallucinations -Build security in: -- Input validation -- Data encryption -- Access control -- Audit logging +### Pipeline Components -## Advanced Patterns +```python +from dataclasses import dataclass +from typing import List -### Pattern 1: Distributed Processing +@dataclass +class Document: + content: str + metadata: dict + embedding: List[float] = None -Enterprise-scale data processing with fault tolerance. +@dataclass +class RetrievalResult: + document: Document + score: float -### Pattern 2: Real-Time Systems +class RAGPipeline: + def __init__( + self, + embedder: Embedder, + vector_store: VectorStore, + llm: LLMProvider, + reranker: Reranker = None + ): + self.embedder = embedder + self.vector_store = vector_store + self.llm = llm + self.reranker = reranker -Low-latency, high-throughput systems. + def query(self, question: str, top_k: int = 5) -> str: + # 1. Embed query + query_embedding = self.embedder.embed(question) -### Pattern 3: ML at Scale + # 2. Retrieve relevant documents + results = self.vector_store.search(query_embedding, top_k=top_k * 2) -Production ML with monitoring and automation. + # 3. Rerank if available + if self.reranker: + results = self.reranker.rerank(question, results)[:top_k] + else: + results = results[:top_k] -## Best Practices + # 4. Build context + context = self._build_context(results) -### Code Quality -- Comprehensive testing -- Clear documentation -- Code reviews -- Type hints + # 5. Generate response + prompt = self._build_prompt(question, context) + return self.llm.complete(prompt) -### Performance -- Profile before optimizing -- Monitor continuously -- Cache strategically -- Batch operations + def _build_context(self, results: List[RetrievalResult]) -> str: + return "\n\n".join([ + f"[Source {i+1}]: {r.document.content}" + for i, r in enumerate(results) + ]) -### Reliability -- Design for failure -- Implement retries -- Use circuit breakers -- Monitor health + def _build_prompt(self, question: str, context: str) -> str: + return f"""Answer the question based on the context provided. -## Tools & Technologies +Context: +{context} -Essential tools for this domain: -- Development frameworks -- Testing libraries -- Deployment platforms -- Monitoring solutions +Question: {question} -## Further Reading +Answer:""" +``` -- Research papers -- Industry blogs -- Conference talks -- Open source projects +--- + +## Vector Database Selection + +### Comparison Matrix + +| Database | Hosting | Scale | Latency | Cost | Best For | +|----------|---------|-------|---------|------|----------| +| Pinecone | Managed | High | Low | $$ | Production, managed | +| Weaviate | Both | High | Low | $ | Hybrid search | +| Qdrant | Both | High | Very Low | $ | Performance-critical | +| Chroma | Self-hosted | Medium | Low | Free | Prototyping | +| pgvector | Self-hosted | Medium | Medium | Free | Existing Postgres | +| Milvus | Both | Very High | Low | $ | Large-scale | + +### Pinecone Integration + +```python +import pinecone + +class PineconeVectorStore: + def __init__(self, api_key: str, environment: str, index_name: str): + pinecone.init(api_key=api_key, environment=environment) + self.index = pinecone.Index(index_name) + + def upsert(self, documents: List[Document], batch_size: int = 100): + """Upsert documents in batches.""" + vectors = [ + (doc.metadata["id"], doc.embedding, doc.metadata) + for doc in documents + ] + + for i in range(0, len(vectors), batch_size): + batch = vectors[i:i + batch_size] + self.index.upsert(vectors=batch) + + def search(self, embedding: List[float], top_k: int = 5) -> List[RetrievalResult]: + """Search for similar vectors.""" + results = self.index.query( + vector=embedding, + top_k=top_k, + include_metadata=True + ) + + return [ + RetrievalResult( + document=Document( + content=match.metadata.get("content", ""), + metadata=match.metadata + ), + score=match.score + ) + for match in results.matches + ] +``` + +--- + +## Chunking Strategies + +### Strategy Comparison + +| Strategy | Chunk Size | Overlap | Best For | +|----------|------------|---------|----------| +| Fixed | 500-1000 tokens | 50-100 | General text | +| Sentence | 3-5 sentences | 1 sentence | Structured text | +| Paragraph | Natural breaks | None | Documents with clear structure | +| Semantic | Variable | Based on meaning | Research papers | +| Recursive | Hierarchical | Parent-child | Long documents | + +### Recursive Character Splitter + +```python +from langchain.text_splitter import RecursiveCharacterTextSplitter + +def create_chunks( + text: str, + chunk_size: int = 1000, + chunk_overlap: int = 100 +) -> List[str]: + """Split text using recursive character splitting.""" + splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + separators=["\n\n", "\n", ". ", " ", ""] + ) + + return splitter.split_text(text) +``` + +### Semantic Chunking + +```python +from sentence_transformers import SentenceTransformer +import numpy as np + +def semantic_chunk( + sentences: List[str], + embedder: SentenceTransformer, + threshold: float = 0.7 +) -> List[List[str]]: + """Group sentences by semantic similarity.""" + embeddings = embedder.encode(sentences) + + chunks = [] + current_chunk = [sentences[0]] + current_embedding = embeddings[0] + + for i in range(1, len(sentences)): + similarity = np.dot(current_embedding, embeddings[i]) / ( + np.linalg.norm(current_embedding) * np.linalg.norm(embeddings[i]) + ) + + if similarity >= threshold: + current_chunk.append(sentences[i]) + current_embedding = np.mean( + [current_embedding, embeddings[i]], axis=0 + ) + else: + chunks.append(current_chunk) + current_chunk = [sentences[i]] + current_embedding = embeddings[i] + + chunks.append(current_chunk) + return chunks +``` + +--- + +## Embedding Models + +### Model Comparison + +| Model | Dimensions | Quality | Speed | Cost | +|-------|------------|---------|-------|------| +| text-embedding-3-large | 3072 | Excellent | Medium | $0.13/1M | +| text-embedding-3-small | 1536 | Good | Fast | $0.02/1M | +| BGE-large | 1024 | Excellent | Medium | Free | +| all-MiniLM-L6-v2 | 384 | Good | Very Fast | Free | +| Cohere embed-v3 | 1024 | Excellent | Medium | $0.10/1M | + +### Embedding with Caching + +```python +import hashlib +from functools import lru_cache + +class CachedEmbedder: + def __init__(self, model_name: str = "text-embedding-3-small"): + self.client = OpenAI() + self.model = model_name + self._cache = {} + + def embed(self, text: str) -> List[float]: + """Embed text with caching.""" + cache_key = hashlib.md5(text.encode()).hexdigest() + + if cache_key in self._cache: + return self._cache[cache_key] + + response = self.client.embeddings.create( + model=self.model, + input=text + ) + + embedding = response.data[0].embedding + self._cache[cache_key] = embedding + + return embedding + + def embed_batch(self, texts: List[str]) -> List[List[float]]: + """Embed multiple texts efficiently.""" + response = self.client.embeddings.create( + model=self.model, + input=texts + ) + + return [item.embedding for item in response.data] +``` + +--- + +## Retrieval Optimization + +### Hybrid Search + +Combine dense (vector) and sparse (keyword) retrieval: + +```python +from rank_bm25 import BM25Okapi + +class HybridRetriever: + def __init__( + self, + vector_store: VectorStore, + documents: List[Document], + alpha: float = 0.5 + ): + self.vector_store = vector_store + self.alpha = alpha # Weight for vector search + + # Build BM25 index + tokenized = [doc.content.lower().split() for doc in documents] + self.bm25 = BM25Okapi(tokenized) + self.documents = documents + + def search(self, query: str, query_embedding: List[float], top_k: int = 5): + # Vector search + vector_results = self.vector_store.search(query_embedding, top_k=top_k * 2) + + # BM25 search + tokenized_query = query.lower().split() + bm25_scores = self.bm25.get_scores(tokenized_query) + + # Combine scores + combined = {} + for result in vector_results: + doc_id = result.document.metadata["id"] + combined[doc_id] = self.alpha * result.score + + for i, score in enumerate(bm25_scores): + doc_id = self.documents[i].metadata["id"] + if doc_id in combined: + combined[doc_id] += (1 - self.alpha) * score + else: + combined[doc_id] = (1 - self.alpha) * score + + # Sort and return top_k + sorted_ids = sorted(combined.keys(), key=lambda x: combined[x], reverse=True) + return sorted_ids[:top_k] +``` + +### Reranking + +```python +from sentence_transformers import CrossEncoder + +class Reranker: + def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"): + self.model = CrossEncoder(model_name) + + def rerank( + self, + query: str, + results: List[RetrievalResult], + top_k: int = 5 + ) -> List[RetrievalResult]: + """Rerank results using cross-encoder.""" + pairs = [(query, r.document.content) for r in results] + scores = self.model.predict(pairs) + + # Update scores and sort + for i, score in enumerate(scores): + results[i].score = float(score) + + return sorted(results, key=lambda x: x.score, reverse=True)[:top_k] +``` + +### Query Expansion + +```python +def expand_query(query: str, llm: LLMProvider) -> List[str]: + """Generate query variations for better retrieval.""" + prompt = f"""Generate 3 alternative phrasings of this question for search. +Return only the questions, one per line. + +Original: {query} + +Alternatives:""" + + response = llm.complete(prompt, max_tokens=150) + alternatives = [q.strip() for q in response.strip().split("\n") if q.strip()] + + return [query] + alternatives[:3] +```