fix(skill): rewrite senior-ml-engineer with real ML content (#74) (#141)

- Replace 3 boilerplate reference files with real technical content:
  - mlops_production_patterns.md: deployment, feature stores, A/B testing
  - llm_integration_guide.md: provider abstraction, cost management
  - rag_system_architecture.md: vector DBs, chunking, reranking
- Rewrite SKILL.md: add trigger phrases, TOC, numbered workflows
- Remove "world-class" marketing language (appeared 5+ times)
- Standardize terminology to "MLOps" (not "Mlops")
- Add validation checkpoints to all workflows

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alireza Rezvani
2026-01-30 13:03:09 +01:00
committed by GitHub
parent 585dcb3e44
commit 757b0c9dd7
4 changed files with 1143 additions and 352 deletions

View File

@@ -1,226 +1,304 @@
---
name: senior-ml-engineer
description: World-class ML engineering skill for productionizing ML models, MLOps, and building scalable ML systems. Expertise in PyTorch, TensorFlow, model deployment, feature stores, model monitoring, and ML infrastructure. Includes LLM integration, fine-tuning, RAG systems, and agentic AI. Use when deploying ML models, building ML platforms, implementing MLOps, or integrating LLMs into production systems.
description: ML engineering skill for productionizing models, building MLOps pipelines, and integrating LLMs. Covers model deployment, feature stores, drift monitoring, RAG systems, and cost optimization.
triggers:
- MLOps pipeline
- model deployment
- feature store
- model monitoring
- drift detection
- RAG system
- LLM integration
- model serving
- A/B testing ML
- automated retraining
---
# Senior ML/AI Engineer
# Senior ML Engineer
World-class senior ml/ai engineer skill for production-grade AI/ML/Data systems.
Production ML engineering patterns for model deployment, MLOps infrastructure, and LLM integration.
## Quick Start
---
### Main Capabilities
## Table of Contents
```bash
# Core Tool 1
python scripts/model_deployment_pipeline.py --input data/ --output results/
- [Model Deployment Workflow](#model-deployment-workflow)
- [MLOps Pipeline Setup](#mlops-pipeline-setup)
- [LLM Integration Workflow](#llm-integration-workflow)
- [RAG System Implementation](#rag-system-implementation)
- [Model Monitoring](#model-monitoring)
- [Reference Documentation](#reference-documentation)
- [Tools](#tools)
# Core Tool 2
python scripts/rag_system_builder.py --target project/ --analyze
---
# Core Tool 3
python scripts/ml_monitoring_suite.py --config config.yaml --deploy
## Model Deployment Workflow
Deploy a trained model to production with monitoring:
1. Export model to standardized format (ONNX, TorchScript, SavedModel)
2. Package model with dependencies in Docker container
3. Deploy to staging environment
4. Run integration tests against staging
5. Deploy canary (5% traffic) to production
6. Monitor latency and error rates for 1 hour
7. Promote to full production if metrics pass
8. **Validation:** p95 latency < 100ms, error rate < 0.1%
### Container Template
```dockerfile
FROM python:3.11-slim
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model/ /app/model/
COPY src/ /app/src/
HEALTHCHECK CMD curl -f http://localhost:8080/health || exit 1
EXPOSE 8080
CMD ["uvicorn", "src.server:app", "--host", "0.0.0.0", "--port", "8080"]
```
## Core Expertise
### Serving Options
This skill covers world-class capabilities in:
| Option | Latency | Throughput | Use Case |
|--------|---------|------------|----------|
| FastAPI + Uvicorn | Low | Medium | REST APIs, small models |
| Triton Inference Server | Very Low | Very High | GPU inference, batching |
| TensorFlow Serving | Low | High | TensorFlow models |
| TorchServe | Low | High | PyTorch models |
| Ray Serve | Medium | High | Complex pipelines, multi-model |
- Advanced production patterns and architectures
- Scalable system design and implementation
- Performance optimization at scale
- MLOps and DataOps best practices
- Real-time processing and inference
- Distributed computing frameworks
- Model deployment and monitoring
- Security and compliance
- Cost optimization
- Team leadership and mentoring
---
## Tech Stack
## MLOps Pipeline Setup
**Languages:** Python, SQL, R, Scala, Go
**ML Frameworks:** PyTorch, TensorFlow, Scikit-learn, XGBoost
**Data Tools:** Spark, Airflow, dbt, Kafka, Databricks
**LLM Frameworks:** LangChain, LlamaIndex, DSPy
**Deployment:** Docker, Kubernetes, AWS/GCP/Azure
**Monitoring:** MLflow, Weights & Biases, Prometheus
**Databases:** PostgreSQL, BigQuery, Snowflake, Pinecone
Establish automated training and deployment:
1. Configure feature store (Feast, Tecton) for training data
2. Set up experiment tracking (MLflow, Weights & Biases)
3. Create training pipeline with hyperparameter logging
4. Register model in model registry with version metadata
5. Configure staging deployment triggered by registry events
6. Set up A/B testing infrastructure for model comparison
7. Enable drift monitoring with alerting
8. **Validation:** New models automatically evaluated against baseline
### Feature Store Pattern
```python
from feast import Entity, Feature, FeatureView, FileSource
user = Entity(name="user_id", value_type=ValueType.INT64)
user_features = FeatureView(
name="user_features",
entities=["user_id"],
ttl=timedelta(days=1),
features=[
Feature(name="purchase_count_30d", dtype=ValueType.INT64),
Feature(name="avg_order_value", dtype=ValueType.FLOAT),
],
online=True,
source=FileSource(path="data/user_features.parquet"),
)
```
### Retraining Triggers
| Trigger | Detection | Action |
|---------|-----------|--------|
| Scheduled | Cron (weekly/monthly) | Full retrain |
| Performance drop | Accuracy < threshold | Immediate retrain |
| Data drift | PSI > 0.2 | Evaluate, then retrain |
| New data volume | X new samples | Incremental update |
---
## LLM Integration Workflow
Integrate LLM APIs into production applications:
1. Create provider abstraction layer for vendor flexibility
2. Implement retry logic with exponential backoff
3. Configure fallback to secondary provider
4. Set up token counting and context truncation
5. Add response caching for repeated queries
6. Implement cost tracking per request
7. Add structured output validation with Pydantic
8. **Validation:** Response parses correctly, cost within budget
### Provider Abstraction
```python
from abc import ABC, abstractmethod
from tenacity import retry, stop_after_attempt, wait_exponential
class LLMProvider(ABC):
@abstractmethod
def complete(self, prompt: str, **kwargs) -> str:
pass
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
def call_llm_with_retry(provider: LLMProvider, prompt: str) -> str:
return provider.complete(prompt)
```
### Cost Management
| Provider | Input Cost | Output Cost |
|----------|------------|-------------|
| GPT-4 | $0.03/1K | $0.06/1K |
| GPT-3.5 | $0.0005/1K | $0.0015/1K |
| Claude 3 Opus | $0.015/1K | $0.075/1K |
| Claude 3 Haiku | $0.00025/1K | $0.00125/1K |
---
## RAG System Implementation
Build retrieval-augmented generation pipeline:
1. Choose vector database (Pinecone, Qdrant, Weaviate)
2. Select embedding model based on quality/cost tradeoff
3. Implement document chunking strategy
4. Create ingestion pipeline with metadata extraction
5. Build retrieval with query embedding
6. Add reranking for relevance improvement
7. Format context and send to LLM
8. **Validation:** Response references retrieved context, no hallucinations
### Vector Database Selection
| Database | Hosting | Scale | Latency | Best For |
|----------|---------|-------|---------|----------|
| Pinecone | Managed | High | Low | Production, managed |
| Qdrant | Both | High | Very Low | Performance-critical |
| Weaviate | Both | High | Low | Hybrid search |
| Chroma | Self-hosted | Medium | Low | Prototyping |
| pgvector | Self-hosted | Medium | Medium | Existing Postgres |
### Chunking Strategies
| Strategy | Chunk Size | Overlap | Best For |
|----------|------------|---------|----------|
| Fixed | 500-1000 tokens | 50-100 | General text |
| Sentence | 3-5 sentences | 1 sentence | Structured text |
| Semantic | Variable | Based on meaning | Research papers |
| Recursive | Hierarchical | Parent-child | Long documents |
---
## Model Monitoring
Monitor production models for drift and degradation:
1. Set up latency tracking (p50, p95, p99)
2. Configure error rate alerting
3. Implement input data drift detection
4. Track prediction distribution shifts
5. Log ground truth when available
6. Compare model versions with A/B metrics
7. Set up automated retraining triggers
8. **Validation:** Alerts fire before user-visible degradation
### Drift Detection
```python
from scipy.stats import ks_2samp
def detect_drift(reference, current, threshold=0.05):
statistic, p_value = ks_2samp(reference, current)
return {
"drift_detected": p_value < threshold,
"ks_statistic": statistic,
"p_value": p_value
}
```
### Alert Thresholds
| Metric | Warning | Critical |
|--------|---------|----------|
| p95 latency | > 100ms | > 200ms |
| Error rate | > 0.1% | > 1% |
| PSI (drift) | > 0.1 | > 0.2 |
| Accuracy drop | > 2% | > 5% |
---
## Reference Documentation
### 1. Mlops Production Patterns
### MLOps Production Patterns
Comprehensive guide available in `references/mlops_production_patterns.md` covering:
`references/mlops_production_patterns.md` contains:
- Advanced patterns and best practices
- Production implementation strategies
- Performance optimization techniques
- Scalability considerations
- Security and compliance
- Real-world case studies
- Model deployment pipeline with Kubernetes manifests
- Feature store architecture with Feast examples
- Model monitoring with drift detection code
- A/B testing infrastructure with traffic splitting
- Automated retraining pipeline with MLflow
### 2. Llm Integration Guide
### LLM Integration Guide
Complete workflow documentation in `references/llm_integration_guide.md` including:
`references/llm_integration_guide.md` contains:
- Step-by-step processes
- Architecture design patterns
- Tool integration guides
- Performance tuning strategies
- Troubleshooting procedures
- Provider abstraction layer pattern
- Retry and fallback strategies with tenacity
- Prompt engineering templates (few-shot, CoT)
- Token optimization with tiktoken
- Cost calculation and tracking
### 3. Rag System Architecture
### RAG System Architecture
Technical reference guide in `references/rag_system_architecture.md` with:
`references/rag_system_architecture.md` contains:
- System design principles
- Implementation examples
- Configuration best practices
- Deployment strategies
- Monitoring and observability
- RAG pipeline implementation with code
- Vector database comparison and integration
- Chunking strategies (fixed, semantic, recursive)
- Embedding model selection guide
- Hybrid search and reranking patterns
## Production Patterns
---
### Pattern 1: Scalable Data Processing
## Tools
Enterprise-scale data processing with distributed computing:
- Horizontal scaling architecture
- Fault-tolerant design
- Real-time and batch processing
- Data quality validation
- Performance monitoring
### Pattern 2: ML Model Deployment
Production ML system with high availability:
- Model serving with low latency
- A/B testing infrastructure
- Feature store integration
- Model monitoring and drift detection
- Automated retraining pipelines
### Pattern 3: Real-Time Inference
High-throughput inference system:
- Batching and caching strategies
- Load balancing
- Auto-scaling
- Latency optimization
- Cost optimization
## Best Practices
### Development
- Test-driven development
- Code reviews and pair programming
- Documentation as code
- Version control everything
- Continuous integration
### Production
- Monitor everything critical
- Automate deployments
- Feature flags for releases
- Canary deployments
- Comprehensive logging
### Team Leadership
- Mentor junior engineers
- Drive technical decisions
- Establish coding standards
- Foster learning culture
- Cross-functional collaboration
## Performance Targets
**Latency:**
- P50: < 50ms
- P95: < 100ms
- P99: < 200ms
**Throughput:**
- Requests/second: > 1000
- Concurrent users: > 10,000
**Availability:**
- Uptime: 99.9%
- Error rate: < 0.1%
## Security & Compliance
- Authentication & authorization
- Data encryption (at rest & in transit)
- PII handling and anonymization
- GDPR/CCPA compliance
- Regular security audits
- Vulnerability management
## Common Commands
### Model Deployment Pipeline
```bash
# Development
python -m pytest tests/ -v --cov
python -m black src/
python -m pylint src/
# Training
python scripts/train.py --config prod.yaml
python scripts/evaluate.py --model best.pth
# Deployment
docker build -t service:v1 .
kubectl apply -f k8s/
helm upgrade service ./charts/
# Monitoring
kubectl logs -f deployment/service
python scripts/health_check.py
python scripts/model_deployment_pipeline.py --model model.pkl --target staging
```
## Resources
Generates deployment artifacts: Dockerfile, Kubernetes manifests, health checks.
- Advanced Patterns: `references/mlops_production_patterns.md`
- Implementation Guide: `references/llm_integration_guide.md`
- Technical Reference: `references/rag_system_architecture.md`
- Automation Scripts: `scripts/` directory
### RAG System Builder
## Senior-Level Responsibilities
```bash
python scripts/rag_system_builder.py --config rag_config.yaml --analyze
```
As a world-class senior professional:
Scaffolds RAG pipeline with vector store integration and retrieval logic.
1. **Technical Leadership**
- Drive architectural decisions
- Mentor team members
- Establish best practices
- Ensure code quality
### ML Monitoring Suite
2. **Strategic Thinking**
- Align with business goals
- Evaluate trade-offs
- Plan for scale
- Manage technical debt
```bash
python scripts/ml_monitoring_suite.py --config monitoring.yaml --deploy
```
3. **Collaboration**
- Work across teams
- Communicate effectively
- Build consensus
- Share knowledge
Sets up drift detection, alerting, and performance dashboards.
4. **Innovation**
- Stay current with research
- Experiment with new approaches
- Contribute to community
- Drive continuous improvement
---
5. **Production Excellence**
- Ensure high availability
- Monitor proactively
- Optimize performance
- Respond to incidents
## Tech Stack
| Category | Tools |
|----------|-------|
| ML Frameworks | PyTorch, TensorFlow, Scikit-learn, XGBoost |
| LLM Frameworks | LangChain, LlamaIndex, DSPy |
| MLOps | MLflow, Weights & Biases, Kubeflow |
| Data | Spark, Airflow, dbt, Kafka |
| Deployment | Docker, Kubernetes, Triton |
| Databases | PostgreSQL, BigQuery, Pinecone, Redis |

View File

@@ -1,80 +1,317 @@
# Llm Integration Guide
# LLM Integration Guide
## Overview
Production patterns for integrating Large Language Models into applications.
World-class llm integration guide for senior ml/ai engineer.
---
## Core Principles
## Table of Contents
### Production-First Design
- [API Integration Patterns](#api-integration-patterns)
- [Prompt Engineering](#prompt-engineering)
- [Token Optimization](#token-optimization)
- [Cost Management](#cost-management)
- [Error Handling](#error-handling)
Always design with production in mind:
- Scalability: Handle 10x current load
- Reliability: 99.9% uptime target
- Maintainability: Clear, documented code
- Observability: Monitor everything
---
### Performance by Design
## API Integration Patterns
Optimize from the start:
- Efficient algorithms
- Resource awareness
- Strategic caching
- Batch processing
### Provider Abstraction Layer
### Security & Privacy
```python
from abc import ABC, abstractmethod
from typing import List, Dict, Any
Build security in:
- Input validation
- Data encryption
- Access control
- Audit logging
class LLMProvider(ABC):
"""Abstract base class for LLM providers."""
## Advanced Patterns
@abstractmethod
def complete(self, prompt: str, **kwargs) -> str:
pass
### Pattern 1: Distributed Processing
@abstractmethod
def chat(self, messages: List[Dict], **kwargs) -> str:
pass
Enterprise-scale data processing with fault tolerance.
class OpenAIProvider(LLMProvider):
def __init__(self, api_key: str, model: str = "gpt-4"):
self.client = OpenAI(api_key=api_key)
self.model = model
### Pattern 2: Real-Time Systems
def complete(self, prompt: str, **kwargs) -> str:
response = self.client.completions.create(
model=self.model,
prompt=prompt,
**kwargs
)
return response.choices[0].text
Low-latency, high-throughput systems.
class AnthropicProvider(LLMProvider):
def __init__(self, api_key: str, model: str = "claude-3-opus"):
self.client = Anthropic(api_key=api_key)
self.model = model
### Pattern 3: ML at Scale
def chat(self, messages: List[Dict], **kwargs) -> str:
response = self.client.messages.create(
model=self.model,
messages=messages,
**kwargs
)
return response.content[0].text
```
Production ML with monitoring and automation.
### Retry and Fallback Strategy
## Best Practices
```python
import time
from tenacity import retry, stop_after_attempt, wait_exponential
### Code Quality
- Comprehensive testing
- Clear documentation
- Code reviews
- Type hints
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
def call_llm_with_retry(provider: LLMProvider, prompt: str) -> str:
"""Call LLM with exponential backoff retry."""
return provider.complete(prompt)
### Performance
- Profile before optimizing
- Monitor continuously
- Cache strategically
- Batch operations
def call_with_fallback(
primary: LLMProvider,
fallback: LLMProvider,
prompt: str
) -> str:
"""Try primary provider, fall back on failure."""
try:
return call_llm_with_retry(primary, prompt)
except Exception as e:
logger.warning(f"Primary provider failed: {e}, using fallback")
return call_llm_with_retry(fallback, prompt)
```
### Reliability
- Design for failure
- Implement retries
- Use circuit breakers
- Monitor health
---
## Tools & Technologies
## Prompt Engineering
Essential tools for this domain:
- Development frameworks
- Testing libraries
- Deployment platforms
- Monitoring solutions
### Prompt Templates
## Further Reading
| Pattern | Use Case | Structure |
|---------|----------|-----------|
| Zero-shot | Simple tasks | Task description + input |
| Few-shot | Complex tasks | Examples + task + input |
| Chain-of-thought | Reasoning | "Think step by step" + task |
| Role-based | Specialized output | System role + task |
- Research papers
- Industry blogs
- Conference talks
- Open source projects
### Few-Shot Template
```python
FEW_SHOT_TEMPLATE = """
You are a sentiment classifier. Classify the sentiment as positive, negative, or neutral.
Examples:
Input: "This product is amazing, I love it!"
Output: positive
Input: "Terrible experience, waste of money."
Output: negative
Input: "The product arrived on time."
Output: neutral
Now classify:
Input: "{user_input}"
Output:"""
def classify_sentiment(text: str, provider: LLMProvider) -> str:
prompt = FEW_SHOT_TEMPLATE.format(user_input=text)
response = provider.complete(prompt, max_tokens=10, temperature=0)
return response.strip().lower()
```
### System Prompts for Consistency
```python
SYSTEM_PROMPT = """You are a helpful assistant that answers questions about our product.
Guidelines:
- Be concise and direct
- Use bullet points for lists
- If unsure, say "I don't have that information"
- Never make up information
- Keep responses under 200 words
Product context:
{product_context}
"""
def create_chat_messages(user_query: str, context: str) -> List[Dict]:
return [
{"role": "system", "content": SYSTEM_PROMPT.format(product_context=context)},
{"role": "user", "content": user_query}
]
```
---
## Token Optimization
### Token Counting
```python
import tiktoken
def count_tokens(text: str, model: str = "gpt-4") -> int:
"""Count tokens for a given text and model."""
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
def truncate_to_token_limit(text: str, max_tokens: int, model: str = "gpt-4") -> str:
"""Truncate text to fit within token limit."""
encoding = tiktoken.encoding_for_model(model)
tokens = encoding.encode(text)
if len(tokens) <= max_tokens:
return text
return encoding.decode(tokens[:max_tokens])
```
### Context Window Management
| Model | Context Window | Effective Limit |
|-------|----------------|-----------------|
| GPT-4 | 8,192 | ~6,000 (leave room for response) |
| GPT-4-32k | 32,768 | ~28,000 |
| Claude 3 | 200,000 | ~180,000 |
| Llama 3 | 8,192 | ~6,000 |
### Chunking Strategy
```python
def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]:
"""Split text into overlapping chunks."""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
chunks.append(chunk)
start = end - overlap
return chunks
```
---
## Cost Management
### Cost Calculation
| Provider | Input Cost | Output Cost | Example (1K tokens) |
|----------|------------|-------------|---------------------|
| GPT-4 | $0.03/1K | $0.06/1K | $0.09 |
| GPT-3.5 | $0.0005/1K | $0.0015/1K | $0.002 |
| Claude 3 Opus | $0.015/1K | $0.075/1K | $0.09 |
| Claude 3 Haiku | $0.00025/1K | $0.00125/1K | $0.0015 |
### Cost Tracking
```python
from dataclasses import dataclass
from typing import Optional
@dataclass
class LLMUsage:
input_tokens: int
output_tokens: int
model: str
cost: float
def calculate_cost(
input_tokens: int,
output_tokens: int,
model: str
) -> float:
"""Calculate cost based on token usage."""
PRICING = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
"claude-3-opus": {"input": 0.015, "output": 0.075},
}
prices = PRICING.get(model, {"input": 0.01, "output": 0.03})
input_cost = (input_tokens / 1000) * prices["input"]
output_cost = (output_tokens / 1000) * prices["output"]
return input_cost + output_cost
```
### Cost Optimization Strategies
1. **Use smaller models for simple tasks** - GPT-3.5 for classification, GPT-4 for reasoning
2. **Cache common responses** - Store results for repeated queries
3. **Batch requests** - Combine multiple items in single prompt
4. **Truncate context** - Only include relevant information
5. **Set max_tokens limit** - Prevent runaway responses
---
## Error Handling
### Common Error Types
| Error | Cause | Handling |
|-------|-------|----------|
| RateLimitError | Too many requests | Exponential backoff |
| InvalidRequestError | Bad input | Validate before sending |
| AuthenticationError | Invalid API key | Check credentials |
| ServiceUnavailable | Provider down | Fallback to alternative |
| ContextLengthExceeded | Input too long | Truncate or chunk |
### Error Handling Pattern
```python
from openai import RateLimitError, APIError
def safe_llm_call(provider: LLMProvider, prompt: str, max_retries: int = 3) -> str:
"""Safely call LLM with comprehensive error handling."""
for attempt in range(max_retries):
try:
return provider.complete(prompt)
except RateLimitError:
wait_time = 2 ** attempt
logger.warning(f"Rate limited, waiting {wait_time}s")
time.sleep(wait_time)
except APIError as e:
if e.status_code >= 500:
logger.warning(f"Server error: {e}, retrying...")
time.sleep(1)
else:
raise
raise Exception(f"Failed after {max_retries} attempts")
```
### Response Validation
```python
import json
from pydantic import BaseModel, ValidationError
class StructuredResponse(BaseModel):
answer: str
confidence: float
sources: List[str]
def parse_structured_response(response: str) -> StructuredResponse:
"""Parse and validate LLM JSON response."""
try:
data = json.loads(response)
return StructuredResponse(**data)
except json.JSONDecodeError:
raise ValueError("Response is not valid JSON")
except ValidationError as e:
raise ValueError(f"Response validation failed: {e}")
```

View File

@@ -1,80 +1,265 @@
# Mlops Production Patterns
# MLOps Production Patterns
## Overview
Production ML infrastructure patterns for model deployment, monitoring, and lifecycle management.
World-class mlops production patterns for senior ml/ai engineer.
---
## Core Principles
## Table of Contents
### Production-First Design
- [Model Deployment Pipeline](#model-deployment-pipeline)
- [Feature Store Architecture](#feature-store-architecture)
- [Model Monitoring](#model-monitoring)
- [A/B Testing Infrastructure](#ab-testing-infrastructure)
- [Automated Retraining](#automated-retraining)
Always design with production in mind:
- Scalability: Handle 10x current load
- Reliability: 99.9% uptime target
- Maintainability: Clear, documented code
- Observability: Monitor everything
---
### Performance by Design
## Model Deployment Pipeline
Optimize from the start:
- Efficient algorithms
- Resource awareness
- Strategic caching
- Batch processing
### Deployment Workflow
### Security & Privacy
1. Export trained model to standardized format (ONNX, TorchScript, SavedModel)
2. Package model with dependencies in Docker container
3. Deploy to staging environment
4. Run integration tests against staging
5. Deploy canary (5% traffic) to production
6. Monitor latency and error rates for 1 hour
7. Promote to full production if metrics pass
8. **Validation:** p95 latency < 100ms, error rate < 0.1%
Build security in:
- Input validation
- Data encryption
- Access control
- Audit logging
### Container Structure
## Advanced Patterns
```dockerfile
FROM python:3.11-slim
### Pattern 1: Distributed Processing
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
Enterprise-scale data processing with fault tolerance.
# Copy model artifacts
COPY model/ /app/model/
COPY src/ /app/src/
### Pattern 2: Real-Time Systems
# Health check endpoint
HEALTHCHECK CMD curl -f http://localhost:8080/health || exit 1
Low-latency, high-throughput systems.
EXPOSE 8080
CMD ["uvicorn", "src.server:app", "--host", "0.0.0.0", "--port", "8080"]
```
### Pattern 3: ML at Scale
### Model Serving Options
Production ML with monitoring and automation.
| Option | Latency | Throughput | Use Case |
|--------|---------|------------|----------|
| FastAPI + Uvicorn | Low | Medium | REST APIs, small models |
| Triton Inference Server | Very Low | Very High | GPU inference, batching |
| TensorFlow Serving | Low | High | TensorFlow models |
| TorchServe | Low | High | PyTorch models |
| Ray Serve | Medium | High | Complex pipelines, multi-model |
## Best Practices
### Kubernetes Deployment
### Code Quality
- Comprehensive testing
- Clear documentation
- Code reviews
- Type hints
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: model-serving
spec:
replicas: 3
selector:
matchLabels:
app: model-serving
template:
spec:
containers:
- name: model
image: model:v1.0.0
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
```
### Performance
- Profile before optimizing
- Monitor continuously
- Cache strategically
- Batch operations
---
### Reliability
- Design for failure
- Implement retries
- Use circuit breakers
- Monitor health
## Feature Store Architecture
## Tools & Technologies
### Feature Store Components
Essential tools for this domain:
- Development frameworks
- Testing libraries
- Deployment platforms
- Monitoring solutions
| Component | Purpose | Tools |
|-----------|---------|-------|
| Offline Store | Training data, batch features | BigQuery, Snowflake, S3 |
| Online Store | Low-latency serving | Redis, DynamoDB, Feast |
| Feature Registry | Metadata, lineage | Feast, Tecton, Hopsworks |
| Transformation | Feature engineering | Spark, Flink, dbt |
## Further Reading
### Feature Pipeline Workflow
- Research papers
- Industry blogs
- Conference talks
- Open source projects
1. Define feature schema in registry
2. Implement transformation logic (SQL or Python)
3. Backfill historical features to offline store
4. Schedule incremental updates
5. Materialize to online store for serving
6. Monitor feature freshness and quality
7. **Validation:** Feature values within expected ranges, no nulls in required fields
### Feature Definition Example
```python
from feast import Entity, Feature, FeatureView, FileSource
user = Entity(name="user_id", value_type=ValueType.INT64)
user_features = FeatureView(
name="user_features",
entities=["user_id"],
ttl=timedelta(days=1),
features=[
Feature(name="purchase_count_30d", dtype=ValueType.INT64),
Feature(name="avg_order_value", dtype=ValueType.FLOAT),
Feature(name="days_since_last_purchase", dtype=ValueType.INT64),
],
online=True,
source=FileSource(path="data/user_features.parquet"),
)
```
---
## Model Monitoring
### Monitoring Dimensions
| Dimension | Metrics | Alert Threshold |
|-----------|---------|-----------------|
| Latency | p50, p95, p99 | p95 > 100ms |
| Throughput | requests/sec | < 80% baseline |
| Errors | error rate, 5xx count | > 0.1% |
| Data Drift | PSI, KS statistic | PSI > 0.2 |
| Model Drift | accuracy, AUC decay | > 5% drop |
### Data Drift Detection
```python
from scipy.stats import ks_2samp
import numpy as np
def detect_drift(reference: np.array, current: np.array, threshold: float = 0.05):
"""Detect distribution drift using Kolmogorov-Smirnov test."""
statistic, p_value = ks_2samp(reference, current)
drift_detected = p_value < threshold
return {
"drift_detected": drift_detected,
"ks_statistic": statistic,
"p_value": p_value,
"threshold": threshold
}
```
### Monitoring Dashboard Metrics
**Infrastructure:**
- Request latency (p50, p95, p99)
- Requests per second
- Error rate by type
- CPU/memory utilization
- GPU utilization (if applicable)
**Model Performance:**
- Prediction distribution
- Feature value distributions
- Model output confidence
- Ground truth vs predictions (when available)
---
## A/B Testing Infrastructure
### Experiment Workflow
1. Define experiment hypothesis and success metrics
2. Calculate required sample size for statistical power
3. Configure traffic split (control vs treatment)
4. Deploy treatment model alongside control
5. Route traffic based on user/session hash
6. Collect metrics for both variants
7. Run statistical significance test
8. **Validation:** p-value < 0.05, minimum sample size reached
### Traffic Splitting
```python
import hashlib
def get_variant(user_id: str, experiment: str, control_pct: float = 0.5) -> str:
"""Deterministic traffic splitting based on user ID."""
hash_input = f"{user_id}:{experiment}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
bucket = (hash_value % 100) / 100.0
return "control" if bucket < control_pct else "treatment"
```
### Metrics Collection
| Metric Type | Examples | Collection Method |
|-------------|----------|-------------------|
| Primary | Conversion rate, revenue | Event logging |
| Secondary | Latency, engagement | Request logs |
| Guardrail | Error rate, crashes | Monitoring system |
---
## Automated Retraining
### Retraining Triggers
| Trigger | Detection Method | Action |
|---------|------------------|--------|
| Scheduled | Cron (weekly/monthly) | Full retrain |
| Performance drop | Accuracy < threshold | Immediate retrain |
| Data drift | PSI > 0.2 | Evaluate, then retrain |
| New data volume | X new samples | Incremental update |
### Retraining Pipeline
1. Trigger detection (schedule, drift, performance)
2. Fetch latest training data from feature store
3. Run training job with hyperparameter config
4. Evaluate model on holdout set
5. Compare against production model
6. If improved: register new model version
7. Deploy to staging for validation
8. Promote to production via canary
9. **Validation:** New model outperforms baseline on key metrics
### MLflow Model Registry Integration
```python
import mlflow
def register_model(model, metrics: dict, model_name: str):
"""Register trained model with MLflow."""
with mlflow.start_run():
# Log metrics
for name, value in metrics.items():
mlflow.log_metric(name, value)
# Log model
mlflow.sklearn.log_model(model, "model")
# Register in model registry
model_uri = f"runs:/{mlflow.active_run().info.run_id}/model"
mlflow.register_model(model_uri, model_name)
```

View File

@@ -1,80 +1,371 @@
# Rag System Architecture
# RAG System Architecture
## Overview
Retrieval-Augmented Generation patterns for production applications.
World-class rag system architecture for senior ml/ai engineer.
---
## Core Principles
## Table of Contents
### Production-First Design
- [RAG Pipeline Architecture](#rag-pipeline-architecture)
- [Vector Database Selection](#vector-database-selection)
- [Chunking Strategies](#chunking-strategies)
- [Embedding Models](#embedding-models)
- [Retrieval Optimization](#retrieval-optimization)
Always design with production in mind:
- Scalability: Handle 10x current load
- Reliability: 99.9% uptime target
- Maintainability: Clear, documented code
- Observability: Monitor everything
---
### Performance by Design
## RAG Pipeline Architecture
Optimize from the start:
- Efficient algorithms
- Resource awareness
- Strategic caching
- Batch processing
### Basic RAG Flow
### Security & Privacy
1. Receive user query
2. Generate query embedding
3. Search vector database for relevant chunks
4. Rerank retrieved chunks by relevance
5. Format context with retrieved chunks
6. Send prompt to LLM with context
7. Return generated response
8. **Validation:** Response references retrieved context, no hallucinations
Build security in:
- Input validation
- Data encryption
- Access control
- Audit logging
### Pipeline Components
## Advanced Patterns
```python
from dataclasses import dataclass
from typing import List
### Pattern 1: Distributed Processing
@dataclass
class Document:
content: str
metadata: dict
embedding: List[float] = None
Enterprise-scale data processing with fault tolerance.
@dataclass
class RetrievalResult:
document: Document
score: float
### Pattern 2: Real-Time Systems
class RAGPipeline:
def __init__(
self,
embedder: Embedder,
vector_store: VectorStore,
llm: LLMProvider,
reranker: Reranker = None
):
self.embedder = embedder
self.vector_store = vector_store
self.llm = llm
self.reranker = reranker
Low-latency, high-throughput systems.
def query(self, question: str, top_k: int = 5) -> str:
# 1. Embed query
query_embedding = self.embedder.embed(question)
### Pattern 3: ML at Scale
# 2. Retrieve relevant documents
results = self.vector_store.search(query_embedding, top_k=top_k * 2)
Production ML with monitoring and automation.
# 3. Rerank if available
if self.reranker:
results = self.reranker.rerank(question, results)[:top_k]
else:
results = results[:top_k]
## Best Practices
# 4. Build context
context = self._build_context(results)
### Code Quality
- Comprehensive testing
- Clear documentation
- Code reviews
- Type hints
# 5. Generate response
prompt = self._build_prompt(question, context)
return self.llm.complete(prompt)
### Performance
- Profile before optimizing
- Monitor continuously
- Cache strategically
- Batch operations
def _build_context(self, results: List[RetrievalResult]) -> str:
return "\n\n".join([
f"[Source {i+1}]: {r.document.content}"
for i, r in enumerate(results)
])
### Reliability
- Design for failure
- Implement retries
- Use circuit breakers
- Monitor health
def _build_prompt(self, question: str, context: str) -> str:
return f"""Answer the question based on the context provided.
## Tools & Technologies
Context:
{context}
Essential tools for this domain:
- Development frameworks
- Testing libraries
- Deployment platforms
- Monitoring solutions
Question: {question}
## Further Reading
Answer:"""
```
- Research papers
- Industry blogs
- Conference talks
- Open source projects
---
## Vector Database Selection
### Comparison Matrix
| Database | Hosting | Scale | Latency | Cost | Best For |
|----------|---------|-------|---------|------|----------|
| Pinecone | Managed | High | Low | $$ | Production, managed |
| Weaviate | Both | High | Low | $ | Hybrid search |
| Qdrant | Both | High | Very Low | $ | Performance-critical |
| Chroma | Self-hosted | Medium | Low | Free | Prototyping |
| pgvector | Self-hosted | Medium | Medium | Free | Existing Postgres |
| Milvus | Both | Very High | Low | $ | Large-scale |
### Pinecone Integration
```python
import pinecone
class PineconeVectorStore:
def __init__(self, api_key: str, environment: str, index_name: str):
pinecone.init(api_key=api_key, environment=environment)
self.index = pinecone.Index(index_name)
def upsert(self, documents: List[Document], batch_size: int = 100):
"""Upsert documents in batches."""
vectors = [
(doc.metadata["id"], doc.embedding, doc.metadata)
for doc in documents
]
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
self.index.upsert(vectors=batch)
def search(self, embedding: List[float], top_k: int = 5) -> List[RetrievalResult]:
"""Search for similar vectors."""
results = self.index.query(
vector=embedding,
top_k=top_k,
include_metadata=True
)
return [
RetrievalResult(
document=Document(
content=match.metadata.get("content", ""),
metadata=match.metadata
),
score=match.score
)
for match in results.matches
]
```
---
## Chunking Strategies
### Strategy Comparison
| Strategy | Chunk Size | Overlap | Best For |
|----------|------------|---------|----------|
| Fixed | 500-1000 tokens | 50-100 | General text |
| Sentence | 3-5 sentences | 1 sentence | Structured text |
| Paragraph | Natural breaks | None | Documents with clear structure |
| Semantic | Variable | Based on meaning | Research papers |
| Recursive | Hierarchical | Parent-child | Long documents |
### Recursive Character Splitter
```python
from langchain.text_splitter import RecursiveCharacterTextSplitter
def create_chunks(
text: str,
chunk_size: int = 1000,
chunk_overlap: int = 100
) -> List[str]:
"""Split text using recursive character splitting."""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", " ", ""]
)
return splitter.split_text(text)
```
### Semantic Chunking
```python
from sentence_transformers import SentenceTransformer
import numpy as np
def semantic_chunk(
sentences: List[str],
embedder: SentenceTransformer,
threshold: float = 0.7
) -> List[List[str]]:
"""Group sentences by semantic similarity."""
embeddings = embedder.encode(sentences)
chunks = []
current_chunk = [sentences[0]]
current_embedding = embeddings[0]
for i in range(1, len(sentences)):
similarity = np.dot(current_embedding, embeddings[i]) / (
np.linalg.norm(current_embedding) * np.linalg.norm(embeddings[i])
)
if similarity >= threshold:
current_chunk.append(sentences[i])
current_embedding = np.mean(
[current_embedding, embeddings[i]], axis=0
)
else:
chunks.append(current_chunk)
current_chunk = [sentences[i]]
current_embedding = embeddings[i]
chunks.append(current_chunk)
return chunks
```
---
## Embedding Models
### Model Comparison
| Model | Dimensions | Quality | Speed | Cost |
|-------|------------|---------|-------|------|
| text-embedding-3-large | 3072 | Excellent | Medium | $0.13/1M |
| text-embedding-3-small | 1536 | Good | Fast | $0.02/1M |
| BGE-large | 1024 | Excellent | Medium | Free |
| all-MiniLM-L6-v2 | 384 | Good | Very Fast | Free |
| Cohere embed-v3 | 1024 | Excellent | Medium | $0.10/1M |
### Embedding with Caching
```python
import hashlib
from functools import lru_cache
class CachedEmbedder:
def __init__(self, model_name: str = "text-embedding-3-small"):
self.client = OpenAI()
self.model = model_name
self._cache = {}
def embed(self, text: str) -> List[float]:
"""Embed text with caching."""
cache_key = hashlib.md5(text.encode()).hexdigest()
if cache_key in self._cache:
return self._cache[cache_key]
response = self.client.embeddings.create(
model=self.model,
input=text
)
embedding = response.data[0].embedding
self._cache[cache_key] = embedding
return embedding
def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""Embed multiple texts efficiently."""
response = self.client.embeddings.create(
model=self.model,
input=texts
)
return [item.embedding for item in response.data]
```
---
## Retrieval Optimization
### Hybrid Search
Combine dense (vector) and sparse (keyword) retrieval:
```python
from rank_bm25 import BM25Okapi
class HybridRetriever:
def __init__(
self,
vector_store: VectorStore,
documents: List[Document],
alpha: float = 0.5
):
self.vector_store = vector_store
self.alpha = alpha # Weight for vector search
# Build BM25 index
tokenized = [doc.content.lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized)
self.documents = documents
def search(self, query: str, query_embedding: List[float], top_k: int = 5):
# Vector search
vector_results = self.vector_store.search(query_embedding, top_k=top_k * 2)
# BM25 search
tokenized_query = query.lower().split()
bm25_scores = self.bm25.get_scores(tokenized_query)
# Combine scores
combined = {}
for result in vector_results:
doc_id = result.document.metadata["id"]
combined[doc_id] = self.alpha * result.score
for i, score in enumerate(bm25_scores):
doc_id = self.documents[i].metadata["id"]
if doc_id in combined:
combined[doc_id] += (1 - self.alpha) * score
else:
combined[doc_id] = (1 - self.alpha) * score
# Sort and return top_k
sorted_ids = sorted(combined.keys(), key=lambda x: combined[x], reverse=True)
return sorted_ids[:top_k]
```
### Reranking
```python
from sentence_transformers import CrossEncoder
class Reranker:
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
self.model = CrossEncoder(model_name)
def rerank(
self,
query: str,
results: List[RetrievalResult],
top_k: int = 5
) -> List[RetrievalResult]:
"""Rerank results using cross-encoder."""
pairs = [(query, r.document.content) for r in results]
scores = self.model.predict(pairs)
# Update scores and sort
for i, score in enumerate(scores):
results[i].score = float(score)
return sorted(results, key=lambda x: x.score, reverse=True)[:top_k]
```
### Query Expansion
```python
def expand_query(query: str, llm: LLMProvider) -> List[str]:
"""Generate query variations for better retrieval."""
prompt = f"""Generate 3 alternative phrasings of this question for search.
Return only the questions, one per line.
Original: {query}
Alternatives:"""
response = llm.complete(prompt, max_tokens=150)
alternatives = [q.strip() for q in response.strip().split("\n") if q.strip()]
return [query] + alternatives[:3]
```