feat(bundles): add editorial bundle plugins

This commit is contained in:
sickn33
2026-03-27 08:48:03 +01:00
parent 8eff08b706
commit dffac91d3b
1052 changed files with 212282 additions and 68 deletions

View File

@@ -0,0 +1,33 @@
{
"name": "antigravity-bundle-data-engineering",
"version": "8.10.0",
"description": "Install the \"Data Engineering\" editorial skill bundle from Antigravity Awesome Skills.",
"author": {
"name": "sickn33 and contributors",
"url": "https://github.com/sickn33/antigravity-awesome-skills"
},
"homepage": "https://github.com/sickn33/antigravity-awesome-skills",
"repository": "https://github.com/sickn33/antigravity-awesome-skills",
"license": "MIT",
"keywords": [
"codex",
"skills",
"bundle",
"data-engineering",
"productivity"
],
"skills": "./skills/",
"interface": {
"displayName": "Data Engineering",
"shortDescription": "Data & Analytics · 5 curated skills",
"longDescription": "For building data pipelines. Covers Data Engineer, Airflow DAG Patterns, and 3 more skills.",
"developerName": "sickn33 and contributors",
"category": "Data & Analytics",
"capabilities": [
"Interactive",
"Write"
],
"websiteURL": "https://github.com/sickn33/antigravity-awesome-skills",
"brandColor": "#111827"
}
}

View File

@@ -0,0 +1,44 @@
---
name: airflow-dag-patterns
description: "Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs."
risk: unknown
source: community
date_added: "2026-02-27"
---
# Apache Airflow DAG Patterns
Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies.
## Use this skill when
- Creating data pipeline orchestration with Airflow
- Designing DAG structures and dependencies
- Implementing custom operators and sensors
- Testing Airflow DAGs locally
- Setting up Airflow in production
- Debugging failed DAG runs
## Do not use this skill when
- You only need a simple cron job or shell script
- Airflow is not part of the tooling stack
- The task is unrelated to workflow orchestration
## Instructions
1. Identify data sources, schedules, and dependencies.
2. Design idempotent tasks with clear ownership and retries.
3. Implement DAGs with observability and alerting hooks.
4. Validate in staging and document operational runbooks.
Refer to `resources/implementation-playbook.md` for detailed patterns, checklists, and templates.
## Safety
- Avoid changing production DAG schedules without approval.
- Test backfills and retries carefully to prevent data duplication.
## Resources
- `resources/implementation-playbook.md` for detailed patterns, checklists, and templates.

View File

@@ -0,0 +1,509 @@
# Apache Airflow DAG Patterns Implementation Playbook
This file contains detailed patterns, checklists, and code samples referenced by the skill.
## Core Concepts
### 1. DAG Design Principles
| Principle | Description |
|-----------|-------------|
| **Idempotent** | Running twice produces same result |
| **Atomic** | Tasks succeed or fail completely |
| **Incremental** | Process only new/changed data |
| **Observable** | Logs, metrics, alerts at every step |
### 2. Task Dependencies
```python
# Linear
task1 >> task2 >> task3
# Fan-out
task1 >> [task2, task3, task4]
# Fan-in
[task1, task2, task3] >> task4
# Complex
task1 >> task2 >> task4
task1 >> task3 >> task4
```
## Quick Start
```python
# dags/example_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(hours=1),
}
with DAG(
dag_id='example_etl',
default_args=default_args,
description='Example ETL pipeline',
schedule='0 6 * * *', # Daily at 6 AM
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'example'],
max_active_runs=1,
) as dag:
start = EmptyOperator(task_id='start')
def extract_data(**context):
execution_date = context['ds']
# Extract logic here
return {'records': 1000}
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
end = EmptyOperator(task_id='end')
start >> extract >> end
```
## Patterns
### Pattern 1: TaskFlow API (Airflow 2.0+)
```python
# dags/taskflow_example.py
from datetime import datetime
from airflow.decorators import dag, task
from airflow.models import Variable
@dag(
dag_id='taskflow_etl',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'taskflow'],
)
def taskflow_etl():
"""ETL pipeline using TaskFlow API"""
@task()
def extract(source: str) -> dict:
"""Extract data from source"""
import pandas as pd
df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')
return {'data': df.to_dict(), 'rows': len(df)}
@task()
def transform(extracted: dict) -> dict:
"""Transform extracted data"""
import pandas as pd
df = pd.DataFrame(extracted['data'])
df['processed_at'] = datetime.now()
df = df.dropna()
return {'data': df.to_dict(), 'rows': len(df)}
@task()
def load(transformed: dict, target: str):
"""Load data to target"""
import pandas as pd
df = pd.DataFrame(transformed['data'])
df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')
return transformed['rows']
@task()
def notify(rows_loaded: int):
"""Send notification"""
print(f'Loaded {rows_loaded} rows')
# Define dependencies with XCom passing
extracted = extract(source='raw_data')
transformed = transform(extracted)
loaded = load(transformed, target='processed_data')
notify(loaded)
# Instantiate the DAG
taskflow_etl()
```
### Pattern 2: Dynamic DAG Generation
```python
# dags/dynamic_dag_factory.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
import json
# Configuration for multiple similar pipelines
PIPELINE_CONFIGS = [
{'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'},
{'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'},
{'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'},
]
def create_dag(config: dict) -> DAG:
"""Factory function to create DAGs from config"""
dag_id = f"etl_{config['name']}"
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule=config['schedule'],
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'dynamic', config['name']],
)
with dag:
def extract_fn(source, **context):
print(f"Extracting from {source} for {context['ds']}")
def transform_fn(**context):
print(f"Transforming data for {context['ds']}")
def load_fn(table_name, **context):
print(f"Loading to {table_name} for {context['ds']}")
extract = PythonOperator(
task_id='extract',
python_callable=extract_fn,
op_kwargs={'source': config['source']},
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_fn,
)
load = PythonOperator(
task_id='load',
python_callable=load_fn,
op_kwargs={'table_name': config['name']},
)
extract >> transform >> load
return dag
# Generate DAGs
for config in PIPELINE_CONFIGS:
globals()[f"dag_{config['name']}"] = create_dag(config)
```
### Pattern 3: Branching and Conditional Logic
```python
# dags/branching_example.py
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
@dag(
dag_id='branching_pipeline',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
)
def branching_pipeline():
@task()
def check_data_quality() -> dict:
"""Check data quality and return metrics"""
quality_score = 0.95 # Simulated
return {'score': quality_score, 'rows': 10000}
def choose_branch(**context) -> str:
"""Determine which branch to execute"""
ti = context['ti']
metrics = ti.xcom_pull(task_ids='check_data_quality')
if metrics['score'] >= 0.9:
return 'high_quality_path'
elif metrics['score'] >= 0.7:
return 'medium_quality_path'
else:
return 'low_quality_path'
quality_check = check_data_quality()
branch = BranchPythonOperator(
task_id='branch',
python_callable=choose_branch,
)
high_quality = EmptyOperator(task_id='high_quality_path')
medium_quality = EmptyOperator(task_id='medium_quality_path')
low_quality = EmptyOperator(task_id='low_quality_path')
# Join point - runs after any branch completes
join = EmptyOperator(
task_id='join',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join
branching_pipeline()
```
### Pattern 4: Sensors and External Dependencies
```python
# dags/sensor_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator
with DAG(
dag_id='sensor_example',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Wait for file on S3
wait_for_file = S3KeySensor(
task_id='wait_for_s3_file',
bucket_name='data-lake',
bucket_key='raw/{{ ds }}/data.parquet',
aws_conn_id='aws_default',
timeout=60 * 60 * 2, # 2 hours
poke_interval=60 * 5, # Check every 5 minutes
mode='reschedule', # Free up worker slot while waiting
)
# Wait for another DAG to complete
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream_dag',
external_dag_id='upstream_etl',
external_task_id='final_task',
execution_date_fn=lambda dt: dt, # Same execution date
timeout=60 * 60 * 3,
mode='reschedule',
)
# Custom sensor using @task.sensor decorator
@task.sensor(poke_interval=60, timeout=3600, mode='reschedule')
def wait_for_api() -> PokeReturnValue:
"""Custom sensor for API availability"""
import requests
response = requests.get('https://api.example.com/health')
is_done = response.status_code == 200
return PokeReturnValue(is_done=is_done, xcom_value=response.json())
api_ready = wait_for_api()
def process_data(**context):
api_result = context['ti'].xcom_pull(task_ids='wait_for_api')
print(f"API returned: {api_result}")
process = PythonOperator(
task_id='process',
python_callable=process_data,
)
[wait_for_file, wait_for_upstream, api_ready] >> process
```
### Pattern 5: Error Handling and Alerts
```python
# dags/error_handling.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable
def task_failure_callback(context):
"""Callback on task failure"""
task_instance = context['task_instance']
exception = context.get('exception')
# Send to Slack/PagerDuty/etc
message = f"""
Task Failed!
DAG: {task_instance.dag_id}
Task: {task_instance.task_id}
Execution Date: {context['ds']}
Error: {exception}
Log URL: {task_instance.log_url}
"""
# send_slack_alert(message)
print(message)
def dag_failure_callback(context):
"""Callback on DAG failure"""
# Aggregate failures, send summary
pass
with DAG(
dag_id='error_handling_example',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
on_failure_callback=dag_failure_callback,
default_args={
'on_failure_callback': task_failure_callback,
'retries': 3,
'retry_delay': timedelta(minutes=5),
},
) as dag:
def might_fail(**context):
import random
if random.random() < 0.3:
raise ValueError("Random failure!")
return "Success"
risky_task = PythonOperator(
task_id='risky_task',
python_callable=might_fail,
)
def cleanup(**context):
"""Cleanup runs regardless of upstream failures"""
print("Cleaning up...")
cleanup_task = PythonOperator(
task_id='cleanup',
python_callable=cleanup,
trigger_rule=TriggerRule.ALL_DONE, # Run even if upstream fails
)
def notify_success(**context):
"""Only runs if all upstream succeeded"""
print("All tasks succeeded!")
success_notification = PythonOperator(
task_id='notify_success',
python_callable=notify_success,
trigger_rule=TriggerRule.ALL_SUCCESS,
)
risky_task >> [cleanup_task, success_notification]
```
### Pattern 6: Testing DAGs
```python
# tests/test_dags.py
import pytest
from datetime import datetime
from airflow.models import DagBag
@pytest.fixture
def dagbag():
return DagBag(dag_folder='dags/', include_examples=False)
def test_dag_loaded(dagbag):
"""Test that all DAGs load without errors"""
assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}"
def test_dag_structure(dagbag):
"""Test specific DAG structure"""
dag = dagbag.get_dag('example_etl')
assert dag is not None
assert len(dag.tasks) == 3
assert dag.schedule_interval == '0 6 * * *'
def test_task_dependencies(dagbag):
"""Test task dependencies are correct"""
dag = dagbag.get_dag('example_etl')
extract_task = dag.get_task('extract')
assert 'start' in [t.task_id for t in extract_task.upstream_list]
assert 'end' in [t.task_id for t in extract_task.downstream_list]
def test_dag_integrity(dagbag):
"""Test DAG has no cycles and is valid"""
for dag_id, dag in dagbag.dags.items():
assert dag.test_cycle() is None, f"Cycle detected in {dag_id}"
# Test individual task logic
def test_extract_function():
"""Unit test for extract function"""
from dags.example_dag import extract_data
result = extract_data(ds='2024-01-01')
assert 'records' in result
assert isinstance(result['records'], int)
```
## Project Structure
```
airflow/
├── dags/
│ ├── __init__.py
│ ├── common/
│ │ ├── __init__.py
│ │ ├── operators.py # Custom operators
│ │ ├── sensors.py # Custom sensors
│ │ └── callbacks.py # Alert callbacks
│ ├── etl/
│ │ ├── customers.py
│ │ └── orders.py
│ └── ml/
│ └── training.py
├── plugins/
│ └── custom_plugin.py
├── tests/
│ ├── __init__.py
│ ├── test_dags.py
│ └── test_operators.py
├── docker-compose.yml
└── requirements.txt
```
## Best Practices
### Do's
- **Use TaskFlow API** - Cleaner code, automatic XCom
- **Set timeouts** - Prevent zombie tasks
- **Use `mode='reschedule'`** - For sensors, free up workers
- **Test DAGs** - Unit tests and integration tests
- **Idempotent tasks** - Safe to retry
### Don'ts
- **Don't use `depends_on_past=True`** - Creates bottlenecks
- **Don't hardcode dates** - Use `{{ ds }}` macros
- **Don't use global state** - Tasks should be stateless
- **Don't skip catchup blindly** - Understand implications
- **Don't put heavy logic in DAG file** - Import from modules
## Resources
- [Airflow Documentation](https://airflow.apache.org/docs/)
- [Astronomer Guides](https://docs.astronomer.io/learn)
- [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html)

View File

@@ -0,0 +1,222 @@
---
name: data-engineer
description: Build scalable data pipelines, modern data warehouses, and real-time streaming architectures. Implements Apache Spark, dbt, Airflow, and cloud-native data platforms.
risk: unknown
source: community
date_added: '2026-02-27'
---
You are a data engineer specializing in scalable data pipelines, modern data architecture, and analytics infrastructure.
## Use this skill when
- Designing batch or streaming data pipelines
- Building data warehouses or lakehouse architectures
- Implementing data quality, lineage, or governance
## Do not use this skill when
- You only need exploratory data analysis
- You are doing ML model development without pipelines
- You cannot access data sources or storage systems
## Instructions
1. Define sources, SLAs, and data contracts.
2. Choose architecture, storage, and orchestration tools.
3. Implement ingestion, transformation, and validation.
4. Monitor quality, costs, and operational reliability.
## Safety
- Protect PII and enforce least-privilege access.
- Validate data before writing to production sinks.
## Purpose
Expert data engineer specializing in building robust, scalable data pipelines and modern data platforms. Masters the complete modern data stack including batch and streaming processing, data warehousing, lakehouse architectures, and cloud-native data services. Focuses on reliable, performant, and cost-effective data solutions.
## Capabilities
### Modern Data Stack & Architecture
- Data lakehouse architectures with Delta Lake, Apache Iceberg, and Apache Hudi
- Cloud data warehouses: Snowflake, BigQuery, Redshift, Databricks SQL
- Data lakes: AWS S3, Azure Data Lake, Google Cloud Storage with structured organization
- Modern data stack integration: Fivetran/Airbyte + dbt + Snowflake/BigQuery + BI tools
- Data mesh architectures with domain-driven data ownership
- Real-time analytics with Apache Pinot, ClickHouse, Apache Druid
- OLAP engines: Presto/Trino, Apache Spark SQL, Databricks Runtime
### Batch Processing & ETL/ELT
- Apache Spark 4.0 with optimized Catalyst engine and columnar processing
- dbt Core/Cloud for data transformations with version control and testing
- Apache Airflow for complex workflow orchestration and dependency management
- Databricks for unified analytics platform with collaborative notebooks
- AWS Glue, Azure Synapse Analytics, Google Dataflow for cloud ETL
- Custom Python/Scala data processing with pandas, Polars, Ray
- Data validation and quality monitoring with Great Expectations
- Data profiling and discovery with Apache Atlas, DataHub, Amundsen
### Real-Time Streaming & Event Processing
- Apache Kafka and Confluent Platform for event streaming
- Apache Pulsar for geo-replicated messaging and multi-tenancy
- Apache Flink and Kafka Streams for complex event processing
- AWS Kinesis, Azure Event Hubs, Google Pub/Sub for cloud streaming
- Real-time data pipelines with change data capture (CDC)
- Stream processing with windowing, aggregations, and joins
- Event-driven architectures with schema evolution and compatibility
- Real-time feature engineering for ML applications
### Workflow Orchestration & Pipeline Management
- Apache Airflow with custom operators and dynamic DAG generation
- Prefect for modern workflow orchestration with dynamic execution
- Dagster for asset-based data pipeline orchestration
- Azure Data Factory and AWS Step Functions for cloud workflows
- GitHub Actions and GitLab CI/CD for data pipeline automation
- Kubernetes CronJobs and Argo Workflows for container-native scheduling
- Pipeline monitoring, alerting, and failure recovery mechanisms
- Data lineage tracking and impact analysis
### Data Modeling & Warehousing
- Dimensional modeling: star schema, snowflake schema design
- Data vault modeling for enterprise data warehousing
- One Big Table (OBT) and wide table approaches for analytics
- Slowly changing dimensions (SCD) implementation strategies
- Data partitioning and clustering strategies for performance
- Incremental data loading and change data capture patterns
- Data archiving and retention policy implementation
- Performance tuning: indexing, materialized views, query optimization
### Cloud Data Platforms & Services
#### AWS Data Engineering Stack
- Amazon S3 for data lake with intelligent tiering and lifecycle policies
- AWS Glue for serverless ETL with automatic schema discovery
- Amazon Redshift and Redshift Spectrum for data warehousing
- Amazon EMR and EMR Serverless for big data processing
- Amazon Kinesis for real-time streaming and analytics
- AWS Lake Formation for data lake governance and security
- Amazon Athena for serverless SQL queries on S3 data
- AWS DataBrew for visual data preparation
#### Azure Data Engineering Stack
- Azure Data Lake Storage Gen2 for hierarchical data lake
- Azure Synapse Analytics for unified analytics platform
- Azure Data Factory for cloud-native data integration
- Azure Databricks for collaborative analytics and ML
- Azure Stream Analytics for real-time stream processing
- Azure Purview for unified data governance and catalog
- Azure SQL Database and Cosmos DB for operational data stores
- Power BI integration for self-service analytics
#### GCP Data Engineering Stack
- Google Cloud Storage for object storage and data lake
- BigQuery for serverless data warehouse with ML capabilities
- Cloud Dataflow for stream and batch data processing
- Cloud Composer (managed Airflow) for workflow orchestration
- Cloud Pub/Sub for messaging and event ingestion
- Cloud Data Fusion for visual data integration
- Cloud Dataproc for managed Hadoop and Spark clusters
- Looker integration for business intelligence
### Data Quality & Governance
- Data quality frameworks with Great Expectations and custom validators
- Data lineage tracking with DataHub, Apache Atlas, Collibra
- Data catalog implementation with metadata management
- Data privacy and compliance: GDPR, CCPA, HIPAA considerations
- Data masking and anonymization techniques
- Access control and row-level security implementation
- Data monitoring and alerting for quality issues
- Schema evolution and backward compatibility management
### Performance Optimization & Scaling
- Query optimization techniques across different engines
- Partitioning and clustering strategies for large datasets
- Caching and materialized view optimization
- Resource allocation and cost optimization for cloud workloads
- Auto-scaling and spot instance utilization for batch jobs
- Performance monitoring and bottleneck identification
- Data compression and columnar storage optimization
- Distributed processing optimization with appropriate parallelism
### Database Technologies & Integration
- Relational databases: PostgreSQL, MySQL, SQL Server integration
- NoSQL databases: MongoDB, Cassandra, DynamoDB for diverse data types
- Time-series databases: InfluxDB, TimescaleDB for IoT and monitoring data
- Graph databases: Neo4j, Amazon Neptune for relationship analysis
- Search engines: Elasticsearch, OpenSearch for full-text search
- Vector databases: Pinecone, Qdrant for AI/ML applications
- Database replication, CDC, and synchronization patterns
- Multi-database query federation and virtualization
### Infrastructure & DevOps for Data
- Infrastructure as Code with Terraform, CloudFormation, Bicep
- Containerization with Docker and Kubernetes for data applications
- CI/CD pipelines for data infrastructure and code deployment
- Version control strategies for data code, schemas, and configurations
- Environment management: dev, staging, production data environments
- Secrets management and secure credential handling
- Monitoring and logging with Prometheus, Grafana, ELK stack
- Disaster recovery and backup strategies for data systems
### Data Security & Compliance
- Encryption at rest and in transit for all data movement
- Identity and access management (IAM) for data resources
- Network security and VPC configuration for data platforms
- Audit logging and compliance reporting automation
- Data classification and sensitivity labeling
- Privacy-preserving techniques: differential privacy, k-anonymity
- Secure data sharing and collaboration patterns
- Compliance automation and policy enforcement
### Integration & API Development
- RESTful APIs for data access and metadata management
- GraphQL APIs for flexible data querying and federation
- Real-time APIs with WebSockets and Server-Sent Events
- Data API gateways and rate limiting implementation
- Event-driven integration patterns with message queues
- Third-party data source integration: APIs, databases, SaaS platforms
- Data synchronization and conflict resolution strategies
- API documentation and developer experience optimization
## Behavioral Traits
- Prioritizes data reliability and consistency over quick fixes
- Implements comprehensive monitoring and alerting from the start
- Focuses on scalable and maintainable data architecture decisions
- Emphasizes cost optimization while maintaining performance requirements
- Plans for data governance and compliance from the design phase
- Uses infrastructure as code for reproducible deployments
- Implements thorough testing for data pipelines and transformations
- Documents data schemas, lineage, and business logic clearly
- Stays current with evolving data technologies and best practices
- Balances performance optimization with operational simplicity
## Knowledge Base
- Modern data stack architectures and integration patterns
- Cloud-native data services and their optimization techniques
- Streaming and batch processing design patterns
- Data modeling techniques for different analytical use cases
- Performance tuning across various data processing engines
- Data governance and quality management best practices
- Cost optimization strategies for cloud data workloads
- Security and compliance requirements for data systems
- DevOps practices adapted for data engineering workflows
- Emerging trends in data architecture and tooling
## Response Approach
1. **Analyze data requirements** for scale, latency, and consistency needs
2. **Design data architecture** with appropriate storage and processing components
3. **Implement robust data pipelines** with comprehensive error handling and monitoring
4. **Include data quality checks** and validation throughout the pipeline
5. **Consider cost and performance** implications of architectural decisions
6. **Plan for data governance** and compliance requirements early
7. **Implement monitoring and alerting** for data pipeline health and performance
8. **Document data flows** and provide operational runbooks for maintenance
## Example Interactions
- "Design a real-time streaming pipeline that processes 1M events per second from Kafka to BigQuery"
- "Build a modern data stack with dbt, Snowflake, and Fivetran for dimensional modeling"
- "Implement a cost-optimized data lakehouse architecture using Delta Lake on AWS"
- "Create a data quality framework that monitors and alerts on data anomalies"
- "Design a multi-tenant data platform with proper isolation and governance"
- "Build a change data capture pipeline for real-time synchronization between databases"
- "Implement a data mesh architecture with domain-specific data products"
- "Create a scalable ETL pipeline that handles late-arriving and out-of-order data"

View File

@@ -0,0 +1,37 @@
---
name: dbt-transformation-patterns
description: "Production-ready patterns for dbt (data build tool) including model organization, testing strategies, documentation, and incremental processing."
risk: unknown
source: community
date_added: "2026-02-27"
---
# dbt Transformation Patterns
Production-ready patterns for dbt (data build tool) including model organization, testing strategies, documentation, and incremental processing.
## Use this skill when
- Building data transformation pipelines with dbt
- Organizing models into staging, intermediate, and marts layers
- Implementing data quality tests and documentation
- Creating incremental models for large datasets
- Setting up dbt project structure and conventions
## Do not use this skill when
- The project is not using dbt or a warehouse-backed workflow
- You only need ad-hoc SQL queries
- There is no access to source data or schemas
## Instructions
- Define model layers, naming, and ownership.
- Implement tests, documentation, and freshness checks.
- Choose materializations and incremental strategies.
- Optimize runs with selectors and CI workflows.
- If detailed patterns are required, open `resources/implementation-playbook.md`.
## Resources
- `resources/implementation-playbook.md` for detailed dbt patterns and examples.

View File

@@ -0,0 +1,547 @@
# dbt Transformation Patterns Implementation Playbook
This file contains detailed patterns, checklists, and code samples referenced by the skill.
## Core Concepts
### 1. Model Layers (Medallion Architecture)
```
sources/ Raw data definitions
staging/ 1:1 with source, light cleaning
intermediate/ Business logic, joins, aggregations
marts/ Final analytics tables
```
### 2. Naming Conventions
| Layer | Prefix | Example |
|-------|--------|---------|
| Staging | `stg_` | `stg_stripe__payments` |
| Intermediate | `int_` | `int_payments_pivoted` |
| Marts | `dim_`, `fct_` | `dim_customers`, `fct_orders` |
## Quick Start
```yaml
# dbt_project.yml
name: 'analytics'
version: '1.0.0'
profile: 'analytics'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
vars:
start_date: '2020-01-01'
models:
analytics:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: analytics
```
```
# Project structure
models/
├── staging/
│ ├── stripe/
│ │ ├── _stripe__sources.yml
│ │ ├── _stripe__models.yml
│ │ ├── stg_stripe__customers.sql
│ │ └── stg_stripe__payments.sql
│ └── shopify/
│ ├── _shopify__sources.yml
│ └── stg_shopify__orders.sql
├── intermediate/
│ └── finance/
│ └── int_payments_pivoted.sql
└── marts/
├── core/
│ ├── _core__models.yml
│ ├── dim_customers.sql
│ └── fct_orders.sql
└── finance/
└── fct_revenue.sql
```
## Patterns
### Pattern 1: Source Definitions
```yaml
# models/staging/stripe/_stripe__sources.yml
version: 2
sources:
- name: stripe
description: Raw Stripe data loaded via Fivetran
database: raw
schema: stripe
loader: fivetran
loaded_at_field: _fivetran_synced
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
tables:
- name: customers
description: Stripe customer records
columns:
- name: id
description: Primary key
tests:
- unique
- not_null
- name: email
description: Customer email
- name: created
description: Account creation timestamp
- name: payments
description: Stripe payment transactions
columns:
- name: id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: source('stripe', 'customers')
field: id
```
### Pattern 2: Staging Models
```sql
-- models/staging/stripe/stg_stripe__customers.sql
with source as (
select * from {{ source('stripe', 'customers') }}
),
renamed as (
select
-- ids
id as customer_id,
-- strings
lower(email) as email,
name as customer_name,
-- timestamps
created as created_at,
-- metadata
_fivetran_synced as _loaded_at
from source
)
select * from renamed
```
```sql
-- models/staging/stripe/stg_stripe__payments.sql
{{
config(
materialized='incremental',
unique_key='payment_id',
on_schema_change='append_new_columns'
)
}}
with source as (
select * from {{ source('stripe', 'payments') }}
{% if is_incremental() %}
where _fivetran_synced > (select max(_loaded_at) from {{ this }})
{% endif %}
),
renamed as (
select
-- ids
id as payment_id,
customer_id,
invoice_id,
-- amounts (convert cents to dollars)
amount / 100.0 as amount,
amount_refunded / 100.0 as amount_refunded,
-- status
status as payment_status,
-- timestamps
created as created_at,
-- metadata
_fivetran_synced as _loaded_at
from source
)
select * from renamed
```
### Pattern 3: Intermediate Models
```sql
-- models/intermediate/finance/int_payments_pivoted_to_customer.sql
with payments as (
select * from {{ ref('stg_stripe__payments') }}
),
customers as (
select * from {{ ref('stg_stripe__customers') }}
),
payment_summary as (
select
customer_id,
count(*) as total_payments,
count(case when payment_status = 'succeeded' then 1 end) as successful_payments,
sum(case when payment_status = 'succeeded' then amount else 0 end) as total_amount_paid,
min(created_at) as first_payment_at,
max(created_at) as last_payment_at
from payments
group by customer_id
)
select
customers.customer_id,
customers.email,
customers.created_at as customer_created_at,
coalesce(payment_summary.total_payments, 0) as total_payments,
coalesce(payment_summary.successful_payments, 0) as successful_payments,
coalesce(payment_summary.total_amount_paid, 0) as lifetime_value,
payment_summary.first_payment_at,
payment_summary.last_payment_at
from customers
left join payment_summary using (customer_id)
```
### Pattern 4: Mart Models (Dimensions and Facts)
```sql
-- models/marts/core/dim_customers.sql
{{
config(
materialized='table',
unique_key='customer_id'
)
}}
with customers as (
select * from {{ ref('int_payments_pivoted_to_customer') }}
),
orders as (
select * from {{ ref('stg_shopify__orders') }}
),
order_summary as (
select
customer_id,
count(*) as total_orders,
sum(total_price) as total_order_value,
min(created_at) as first_order_at,
max(created_at) as last_order_at
from orders
group by customer_id
),
final as (
select
-- surrogate key
{{ dbt_utils.generate_surrogate_key(['customers.customer_id']) }} as customer_key,
-- natural key
customers.customer_id,
-- attributes
customers.email,
customers.customer_created_at,
-- payment metrics
customers.total_payments,
customers.successful_payments,
customers.lifetime_value,
customers.first_payment_at,
customers.last_payment_at,
-- order metrics
coalesce(order_summary.total_orders, 0) as total_orders,
coalesce(order_summary.total_order_value, 0) as total_order_value,
order_summary.first_order_at,
order_summary.last_order_at,
-- calculated fields
case
when customers.lifetime_value >= 1000 then 'high'
when customers.lifetime_value >= 100 then 'medium'
else 'low'
end as customer_tier,
-- timestamps
current_timestamp as _loaded_at
from customers
left join order_summary using (customer_id)
)
select * from final
```
```sql
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
)
}}
with orders as (
select * from {{ ref('stg_shopify__orders') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
),
customers as (
select * from {{ ref('dim_customers') }}
),
final as (
select
-- keys
orders.order_id,
customers.customer_key,
orders.customer_id,
-- dimensions
orders.order_status,
orders.fulfillment_status,
orders.payment_status,
-- measures
orders.subtotal,
orders.tax,
orders.shipping,
orders.total_price,
orders.total_discount,
orders.item_count,
-- timestamps
orders.created_at,
orders.updated_at,
orders.fulfilled_at,
-- metadata
current_timestamp as _loaded_at
from orders
left join customers on orders.customer_id = customers.customer_id
)
select * from final
```
### Pattern 5: Testing and Documentation
```yaml
# models/marts/core/_core__models.yml
version: 2
models:
- name: dim_customers
description: Customer dimension with payment and order metrics
columns:
- name: customer_key
description: Surrogate key for the customer dimension
tests:
- unique
- not_null
- name: customer_id
description: Natural key from source system
tests:
- unique
- not_null
- name: email
description: Customer email address
tests:
- not_null
- name: customer_tier
description: Customer value tier based on lifetime value
tests:
- accepted_values:
values: ['high', 'medium', 'low']
- name: lifetime_value
description: Total amount paid by customer
tests:
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: fct_orders
description: Order fact table with all order transactions
tests:
- dbt_utils.recency:
datepart: day
field: created_at
interval: 1
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_key
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_key
```
### Pattern 6: Macros and DRY Code
```sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name }}
{%- endif -%}
{% endmacro %}
-- macros/limit_data_in_dev.sql
{% macro limit_data_in_dev(column_name, days=3) %}
{% if target.name == 'dev' %}
where {{ column_name }} >= dateadd(day, -{{ days }}, current_date)
{% endif %}
{% endmacro %}
-- Usage in model
select * from {{ ref('stg_orders') }}
{{ limit_data_in_dev('created_at') }}
```
### Pattern 7: Incremental Strategies
```sql
-- Delete+Insert (default for most warehouses)
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='delete+insert'
)
}}
-- Merge (best for late-arriving data)
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='merge',
merge_update_columns=['status', 'amount', 'updated_at']
)
}}
-- Insert Overwrite (partition-based)
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
"field": "created_date",
"data_type": "date",
"granularity": "day"
}
)
}}
select
*,
date(created_at) as created_date
from {{ ref('stg_events') }}
{% if is_incremental() %}
where created_date >= dateadd(day, -3, current_date)
{% endif %}
```
## dbt Commands
```bash
# Development
dbt run # Run all models
dbt run --select staging # Run staging models only
dbt run --select +fct_orders # Run fct_orders and its upstream
dbt run --select fct_orders+ # Run fct_orders and its downstream
dbt run --full-refresh # Rebuild incremental models
# Testing
dbt test # Run all tests
dbt test --select stg_stripe # Test specific models
dbt build # Run + test in DAG order
# Documentation
dbt docs generate # Generate docs
dbt docs serve # Serve docs locally
# Debugging
dbt compile # Compile SQL without running
dbt debug # Test connection
dbt ls --select tag:critical # List models by tag
```
## Best Practices
### Do's
- **Use staging layer** - Clean data once, use everywhere
- **Test aggressively** - Not null, unique, relationships
- **Document everything** - Column descriptions, model descriptions
- **Use incremental** - For tables > 1M rows
- **Version control** - dbt project in Git
### Don'ts
- **Don't skip staging** - Raw → mart is tech debt
- **Don't hardcode dates** - Use `{{ var('start_date') }}`
- **Don't repeat logic** - Extract to macros
- **Don't test in prod** - Use dev target
- **Don't ignore freshness** - Monitor source data
## Resources
- [dbt Documentation](https://docs.getdbt.com/)
- [dbt Best Practices](https://docs.getdbt.com/guides/best-practices)
- [dbt-utils Package](https://hub.getdbt.com/dbt-labs/dbt_utils/latest/)
- [dbt Discourse](https://discourse.getdbt.com/)

View File

@@ -0,0 +1,494 @@
---
name: embedding-strategies
description: "Guide to selecting and optimizing embedding models for vector search applications."
risk: unknown
source: community
date_added: "2026-02-27"
---
# Embedding Strategies
Guide to selecting and optimizing embedding models for vector search applications.
## Do not use this skill when
- The task is unrelated to embedding strategies
- You need a different domain or tool outside this scope
## Instructions
- Clarify goals, constraints, and required inputs.
- Apply relevant best practices and validate outcomes.
- Provide actionable steps and verification.
- If detailed examples are required, open `resources/implementation-playbook.md`.
## Use this skill when
- Choosing embedding models for RAG
- Optimizing chunking strategies
- Fine-tuning embeddings for domains
- Comparing embedding model performance
- Reducing embedding dimensions
- Handling multilingual content
## Core Concepts
### 1. Embedding Model Comparison
| Model | Dimensions | Max Tokens | Best For |
|-------|------------|------------|----------|
| **text-embedding-3-large** | 3072 | 8191 | High accuracy |
| **text-embedding-3-small** | 1536 | 8191 | Cost-effective |
| **voyage-2** | 1024 | 4000 | Code, legal |
| **bge-large-en-v1.5** | 1024 | 512 | Open source |
| **all-MiniLM-L6-v2** | 384 | 256 | Fast, lightweight |
| **multilingual-e5-large** | 1024 | 512 | Multi-language |
### 2. Embedding Pipeline
```
Document → Chunking → Preprocessing → Embedding Model → Vector
[Overlap, Size] [Clean, Normalize] [API/Local]
```
## Templates
### Template 1: OpenAI Embeddings
```python
from openai import OpenAI
from typing import List
import numpy as np
client = OpenAI()
def get_embeddings(
texts: List[str],
model: str = "text-embedding-3-small",
dimensions: int = None
) -> List[List[float]]:
"""Get embeddings from OpenAI."""
# Handle batching for large lists
batch_size = 100
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
kwargs = {"input": batch, "model": model}
if dimensions:
kwargs["dimensions"] = dimensions
response = client.embeddings.create(**kwargs)
embeddings = [item.embedding for item in response.data]
all_embeddings.extend(embeddings)
return all_embeddings
def get_embedding(text: str, **kwargs) -> List[float]:
"""Get single embedding."""
return get_embeddings([text], **kwargs)[0]
# Dimension reduction with OpenAI
def get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]:
"""Get embedding with reduced dimensions (Matryoshka)."""
return get_embedding(
text,
model="text-embedding-3-small",
dimensions=dimensions
)
```
### Template 2: Local Embeddings with Sentence Transformers
```python
from sentence_transformers import SentenceTransformer
from typing import List, Optional
import numpy as np
class LocalEmbedder:
"""Local embedding with sentence-transformers."""
def __init__(
self,
model_name: str = "BAAI/bge-large-en-v1.5",
device: str = "cuda"
):
self.model = SentenceTransformer(model_name, device=device)
def embed(
self,
texts: List[str],
normalize: bool = True,
show_progress: bool = False
) -> np.ndarray:
"""Embed texts with optional normalization."""
embeddings = self.model.encode(
texts,
normalize_embeddings=normalize,
show_progress_bar=show_progress,
convert_to_numpy=True
)
return embeddings
def embed_query(self, query: str) -> np.ndarray:
"""Embed a query with BGE-style prefix."""
# BGE models benefit from query prefix
if "bge" in self.model.get_sentence_embedding_dimension():
query = f"Represent this sentence for searching relevant passages: {query}"
return self.embed([query])[0]
def embed_documents(self, documents: List[str]) -> np.ndarray:
"""Embed documents for indexing."""
return self.embed(documents)
# E5 model with instructions
class E5Embedder:
def __init__(self, model_name: str = "intfloat/multilingual-e5-large"):
self.model = SentenceTransformer(model_name)
def embed_query(self, query: str) -> np.ndarray:
return self.model.encode(f"query: {query}")
def embed_document(self, document: str) -> np.ndarray:
return self.model.encode(f"passage: {document}")
```
### Template 3: Chunking Strategies
```python
from typing import List, Tuple
import re
def chunk_by_tokens(
text: str,
chunk_size: int = 512,
chunk_overlap: int = 50,
tokenizer=None
) -> List[str]:
"""Chunk text by token count."""
import tiktoken
tokenizer = tokenizer or tiktoken.get_encoding("cl100k_base")
tokens = tokenizer.encode(text)
chunks = []
start = 0
while start < len(tokens):
end = start + chunk_size
chunk_tokens = tokens[start:end]
chunk_text = tokenizer.decode(chunk_tokens)
chunks.append(chunk_text)
start = end - chunk_overlap
return chunks
def chunk_by_sentences(
text: str,
max_chunk_size: int = 1000,
min_chunk_size: int = 100
) -> List[str]:
"""Chunk text by sentences, respecting size limits."""
import nltk
sentences = nltk.sent_tokenize(text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
if current_size + sentence_size > max_chunk_size and current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_size = 0
current_chunk.append(sentence)
current_size += sentence_size
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def chunk_by_semantic_sections(
text: str,
headers_pattern: str = r'^#{1,3}\s+.+$'
) -> List[Tuple[str, str]]:
"""Chunk markdown by headers, preserving hierarchy."""
lines = text.split('\n')
chunks = []
current_header = ""
current_content = []
for line in lines:
if re.match(headers_pattern, line, re.MULTILINE):
if current_content:
chunks.append((current_header, '\n'.join(current_content)))
current_header = line
current_content = []
else:
current_content.append(line)
if current_content:
chunks.append((current_header, '\n'.join(current_content)))
return chunks
def recursive_character_splitter(
text: str,
chunk_size: int = 1000,
chunk_overlap: int = 200,
separators: List[str] = None
) -> List[str]:
"""LangChain-style recursive splitter."""
separators = separators or ["\n\n", "\n", ". ", " ", ""]
def split_text(text: str, separators: List[str]) -> List[str]:
if not text:
return []
separator = separators[0]
remaining_separators = separators[1:]
if separator == "":
# Character-level split
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)]
splits = text.split(separator)
chunks = []
current_chunk = []
current_length = 0
for split in splits:
split_length = len(split) + len(separator)
if current_length + split_length > chunk_size and current_chunk:
chunk_text = separator.join(current_chunk)
# Recursively split if still too large
if len(chunk_text) > chunk_size and remaining_separators:
chunks.extend(split_text(chunk_text, remaining_separators))
else:
chunks.append(chunk_text)
# Start new chunk with overlap
overlap_splits = []
overlap_length = 0
for s in reversed(current_chunk):
if overlap_length + len(s) <= chunk_overlap:
overlap_splits.insert(0, s)
overlap_length += len(s)
else:
break
current_chunk = overlap_splits
current_length = overlap_length
current_chunk.append(split)
current_length += split_length
if current_chunk:
chunks.append(separator.join(current_chunk))
return chunks
return split_text(text, separators)
```
### Template 4: Domain-Specific Embedding Pipeline
```python
class DomainEmbeddingPipeline:
"""Pipeline for domain-specific embeddings."""
def __init__(
self,
embedding_model: str = "text-embedding-3-small",
chunk_size: int = 512,
chunk_overlap: int = 50,
preprocessing_fn=None
):
self.embedding_model = embedding_model
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.preprocess = preprocessing_fn or self._default_preprocess
def _default_preprocess(self, text: str) -> str:
"""Default preprocessing."""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters
text = re.sub(r'[^\w\s.,!?-]', '', text)
return text.strip()
async def process_documents(
self,
documents: List[dict],
id_field: str = "id",
content_field: str = "content",
metadata_fields: List[str] = None
) -> List[dict]:
"""Process documents for vector storage."""
processed = []
for doc in documents:
content = doc[content_field]
doc_id = doc[id_field]
# Preprocess
cleaned = self.preprocess(content)
# Chunk
chunks = chunk_by_tokens(
cleaned,
self.chunk_size,
self.chunk_overlap
)
# Create embeddings
embeddings = get_embeddings(chunks, self.embedding_model)
# Create records
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
record = {
"id": f"{doc_id}_chunk_{i}",
"document_id": doc_id,
"chunk_index": i,
"text": chunk,
"embedding": embedding
}
# Add metadata
if metadata_fields:
for field in metadata_fields:
if field in doc:
record[field] = doc[field]
processed.append(record)
return processed
# Code-specific pipeline
class CodeEmbeddingPipeline:
"""Specialized pipeline for code embeddings."""
def __init__(self, model: str = "voyage-code-2"):
self.model = model
def chunk_code(self, code: str, language: str) -> List[dict]:
"""Chunk code by functions/classes."""
import tree_sitter
# Parse with tree-sitter
# Extract functions, classes, methods
# Return chunks with context
pass
def embed_with_context(self, chunk: str, context: str) -> List[float]:
"""Embed code with surrounding context."""
combined = f"Context: {context}\n\nCode:\n{chunk}"
return get_embedding(combined, model=self.model)
```
### Template 5: Embedding Quality Evaluation
```python
import numpy as np
from typing import List, Tuple
def evaluate_retrieval_quality(
queries: List[str],
relevant_docs: List[List[str]], # List of relevant doc IDs per query
retrieved_docs: List[List[str]], # List of retrieved doc IDs per query
k: int = 10
) -> dict:
"""Evaluate embedding quality for retrieval."""
def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float:
retrieved_k = retrieved[:k]
relevant_retrieved = len(set(retrieved_k) & relevant)
return relevant_retrieved / k
def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float:
retrieved_k = retrieved[:k]
relevant_retrieved = len(set(retrieved_k) & relevant)
return relevant_retrieved / len(relevant) if relevant else 0
def mrr(relevant: set, retrieved: List[str]) -> float:
for i, doc in enumerate(retrieved):
if doc in relevant:
return 1 / (i + 1)
return 0
def ndcg_at_k(relevant: set, retrieved: List[str], k: int) -> float:
dcg = sum(
1 / np.log2(i + 2) if doc in relevant else 0
for i, doc in enumerate(retrieved[:k])
)
ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(relevant), k)))
return dcg / ideal_dcg if ideal_dcg > 0 else 0
metrics = {
f"precision@{k}": [],
f"recall@{k}": [],
"mrr": [],
f"ndcg@{k}": []
}
for relevant, retrieved in zip(relevant_docs, retrieved_docs):
relevant_set = set(relevant)
metrics[f"precision@{k}"].append(precision_at_k(relevant_set, retrieved, k))
metrics[f"recall@{k}"].append(recall_at_k(relevant_set, retrieved, k))
metrics["mrr"].append(mrr(relevant_set, retrieved))
metrics[f"ndcg@{k}"].append(ndcg_at_k(relevant_set, retrieved, k))
return {name: np.mean(values) for name, values in metrics.items()}
def compute_embedding_similarity(
embeddings1: np.ndarray,
embeddings2: np.ndarray,
metric: str = "cosine"
) -> np.ndarray:
"""Compute similarity matrix between embedding sets."""
if metric == "cosine":
# Normalize
norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True)
norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True)
return norm1 @ norm2.T
elif metric == "euclidean":
from scipy.spatial.distance import cdist
return -cdist(embeddings1, embeddings2, metric='euclidean')
elif metric == "dot":
return embeddings1 @ embeddings2.T
```
## Best Practices
### Do's
- **Match model to use case** - Code vs prose vs multilingual
- **Chunk thoughtfully** - Preserve semantic boundaries
- **Normalize embeddings** - For cosine similarity
- **Batch requests** - More efficient than one-by-one
- **Cache embeddings** - Avoid recomputing
### Don'ts
- **Don't ignore token limits** - Truncation loses info
- **Don't mix embedding models** - Incompatible spaces
- **Don't skip preprocessing** - Garbage in, garbage out
- **Don't over-chunk** - Lose context
## Resources
- [OpenAI Embeddings](https://platform.openai.com/docs/guides/embeddings)
- [Sentence Transformers](https://www.sbert.net/)
- [MTEB Benchmark](https://huggingface.co/spaces/mteb/leaderboard)

View File

@@ -0,0 +1,63 @@
---
name: vector-database-engineer
description: "Expert in vector databases, embedding strategies, and semantic search implementation. Masters Pinecone, Weaviate, Qdrant, Milvus, and pgvector for RAG applications, recommendation systems, and similar"
risk: unknown
source: community
date_added: "2026-02-27"
---
# Vector Database Engineer
Expert in vector databases, embedding strategies, and semantic search implementation. Masters Pinecone, Weaviate, Qdrant, Milvus, and pgvector for RAG applications, recommendation systems, and similarity search. Use PROACTIVELY for vector search implementation, embedding optimization, or semantic retrieval systems.
## Do not use this skill when
- The task is unrelated to vector database engineer
- You need a different domain or tool outside this scope
## Instructions
- Clarify goals, constraints, and required inputs.
- Apply relevant best practices and validate outcomes.
- Provide actionable steps and verification.
- If detailed examples are required, open `resources/implementation-playbook.md`.
## Capabilities
- Vector database selection and architecture
- Embedding model selection and optimization
- Index configuration (HNSW, IVF, PQ)
- Hybrid search (vector + keyword) implementation
- Chunking strategies for documents
- Metadata filtering and pre/post-filtering
- Performance tuning and scaling
## Use this skill when
- Building RAG (Retrieval Augmented Generation) systems
- Implementing semantic search over documents
- Creating recommendation engines
- Building image/audio similarity search
- Optimizing vector search latency and recall
- Scaling vector operations to millions of vectors
## Workflow
1. Analyze data characteristics and query patterns
2. Select appropriate embedding model
3. Design chunking and preprocessing pipeline
4. Choose vector database and index type
5. Configure metadata schema for filtering
6. Implement hybrid search if needed
7. Optimize for latency/recall tradeoffs
8. Set up monitoring and reindexing strategies
## Best Practices
- Choose embedding dimensions based on use case (384-1536)
- Implement proper chunking with overlap
- Use metadata filtering to reduce search space
- Monitor embedding drift over time
- Plan for index rebuilding
- Cache frequent queries
- Test recall vs latency tradeoffs