feat(engineering-team): add snowflake-development skill

Snowflake SQL, data pipelines (Dynamic Tables, Streams+Tasks), Cortex AI,
Snowpark Python, dbt integration. Includes 3 practical workflows, 9
anti-patterns, cross-references, and troubleshooting guide.

- SKILL.md: 294 lines (colon-prefix rule, MERGE, DTs, Cortex AI, Snowpark)
- Script: snowflake_query_helper.py (MERGE, DT, RBAC generators)
- References: 3 files (SQL patterns, Cortex AI/agents, troubleshooting)

Based on PR #416 by James Cha-Earley — enhanced with practical workflows,
anti-patterns section, cross-references, and normalized frontmatter.

Co-Authored-By: James Cha-Earley <jamescha-earley@users.noreply.github.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Reza Rezvani
2026-03-26 09:38:57 +01:00
parent c6206efc49
commit 0e97512a42
8 changed files with 1557 additions and 2 deletions

View File

@@ -0,0 +1,280 @@
# Cortex AI and Agents Reference
Complete reference for Snowflake Cortex AI functions, Cortex Agents, Cortex Search, and Snowpark Python patterns.
## Table of Contents
1. [Cortex AI Functions](#cortex-ai-functions)
2. [Cortex Agents](#cortex-agents)
3. [Cortex Search](#cortex-search)
4. [Snowpark Python](#snowpark-python)
---
## Cortex AI Functions
### Complete Function Reference
| Function | Signature | Returns |
|----------|-----------|---------|
| `AI_COMPLETE` | `AI_COMPLETE(model, prompt)` or `AI_COMPLETE(model, conversation, options)` | STRING or OBJECT |
| `AI_CLASSIFY` | `AI_CLASSIFY(input, categories)` | OBJECT with `labels` array |
| `AI_EXTRACT` | `AI_EXTRACT(input, fields)` | OBJECT with extracted fields |
| `AI_FILTER` | `AI_FILTER(input, condition)` | BOOLEAN |
| `AI_SENTIMENT` | `AI_SENTIMENT(text)` | FLOAT (-1 to 1) |
| `AI_SUMMARIZE` | `AI_SUMMARIZE(text)` | STRING |
| `AI_TRANSLATE` | `AI_TRANSLATE(text, source_lang, target_lang)` | STRING |
| `AI_PARSE_DOCUMENT` | `AI_PARSE_DOCUMENT(file, options)` | OBJECT |
| `AI_REDACT` | `AI_REDACT(text)` | STRING |
| `AI_EMBED` | `AI_EMBED(model, text)` | ARRAY (vector) |
| `AI_AGG` | `AI_AGG(column, instruction)` | STRING |
### Deprecated Function Mapping
| Old Name (Do NOT Use) | New Name |
|-----------------------|----------|
| `COMPLETE` | `AI_COMPLETE` |
| `CLASSIFY_TEXT` | `AI_CLASSIFY` |
| `EXTRACT_ANSWER` | `AI_EXTRACT` |
| `SUMMARIZE` | `AI_SUMMARIZE` |
| `TRANSLATE` | `AI_TRANSLATE` |
| `SENTIMENT` | `AI_SENTIMENT` |
| `EMBED_TEXT_768` | `AI_EMBED` |
### AI_COMPLETE Patterns
**Simple completion:**
```sql
SELECT AI_COMPLETE('claude-4-sonnet', 'Summarize this text: ' || article_text) AS summary
FROM articles;
```
**With system prompt (conversation format):**
```sql
SELECT AI_COMPLETE(
'claude-4-sonnet',
[
{'role': 'system', 'content': 'You are a data quality analyst. Be concise.'},
{'role': 'user', 'content': 'Analyze this record: ' || record::STRING}
]
) AS analysis
FROM flagged_records;
```
**With document input (TO_FILE):**
```sql
SELECT AI_COMPLETE(
'claude-4-sonnet',
'Extract the invoice total from this document',
TO_FILE('@docs_stage', 'invoice.pdf')
) AS invoice_total;
```
### AI_CLASSIFY Patterns
Use AI_CLASSIFY instead of AI_COMPLETE for classification tasks -- it is purpose-built, cheaper, and returns structured output.
```sql
SELECT
ticket_text,
AI_CLASSIFY(ticket_text, ['billing', 'technical', 'account', 'feature_request']):labels[0]::VARCHAR AS category
FROM support_tickets;
```
### AI_EXTRACT Patterns
```sql
SELECT
AI_EXTRACT(email_body, ['sender_name', 'action_requested', 'deadline'])::OBJECT AS extracted
FROM emails;
```
### Cost Awareness
Estimate token costs before running AI functions on large tables:
```sql
-- Count tokens first
SELECT
COUNT(*) AS row_count,
SUM(AI_COUNT_TOKENS('claude-4-sonnet', text_column)) AS total_tokens
FROM my_table;
-- Process a sample first
SELECT AI_COMPLETE('claude-4-sonnet', text_column) FROM my_table SAMPLE (100 ROWS);
```
---
## Cortex Agents
### Agent Spec Structure
```sql
CREATE OR REPLACE AGENT my_db.my_schema.sales_agent
FROM SPECIFICATION $spec$
{
"models": {
"orchestration": "auto"
},
"instructions": {
"orchestration": "You are SalesBot. Help users query sales data.",
"response": "Be concise. Use tables for numeric data."
},
"tools": [
{
"tool_spec": {
"type": "cortex_analyst_text_to_sql",
"name": "SalesQuery",
"description": "Query sales metrics including revenue, orders, and customer data. Use for questions about sales performance, trends, and comparisons."
}
},
{
"tool_spec": {
"type": "cortex_search",
"name": "PolicySearch",
"description": "Search company sales policies and procedures."
}
}
],
"tool_resources": {
"SalesQuery": {
"semantic_model_file": "@my_db.my_schema.models/sales_model.yaml"
},
"PolicySearch": {
"cortex_search_service": "my_db.my_schema.policy_search_service"
}
}
}
$spec$;
```
### Agent Rules
- **Delimiter**: Use `$spec$` not `$$` to avoid conflicts with SQL dollar-quoting.
- **models**: Must be an object (`{"orchestration": "auto"}`), not an array.
- **tool_resources**: A separate top-level key, not nested inside individual tool entries.
- **Empty values in edit specs**: Do NOT include `null` or empty string values when editing -- they clear existing values.
- **Tool descriptions**: The single biggest quality factor. Be specific about what data each tool accesses and what questions it answers.
- **Testing**: Never modify production agents directly. Clone first, test, then swap.
### Calling an Agent
```sql
SELECT SNOWFLAKE.CORTEX.AGENT(
'my_db.my_schema.sales_agent',
'What was total revenue last quarter?'
);
```
---
## Cortex Search
### Creating a Search Service
```sql
CREATE OR REPLACE CORTEX SEARCH SERVICE my_db.my_schema.docs_search
ON text_column
ATTRIBUTES category, department
WAREHOUSE = search_wh
TARGET_LAG = '1 hour'
AS (
SELECT text_column, category, department, doc_id
FROM documents
);
```
### Querying a Search Service
```sql
SELECT PARSE_JSON(
SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
'my_db.my_schema.docs_search',
'{
"query": "return policy for electronics",
"columns": ["text_column", "category"],
"filter": {"@eq": {"department": "retail"}},
"limit": 5
}'
)
) AS results;
```
---
## Snowpark Python
### Session Setup
```python
from snowflake.snowpark import Session
import os
session = Session.builder.configs({
"account": os.environ["SNOWFLAKE_ACCOUNT"],
"user": os.environ["SNOWFLAKE_USER"],
"password": os.environ["SNOWFLAKE_PASSWORD"],
"role": "my_role",
"warehouse": "my_wh",
"database": "my_db",
"schema": "my_schema"
}).create()
```
### DataFrame Operations
```python
# Lazy operations -- nothing executes until collect()/show()
df = session.table("events")
result = (
df.filter(df["event_type"] == "purchase")
.group_by("user_id")
.agg(F.sum("amount").alias("total_spent"))
.sort(F.col("total_spent").desc())
)
result.show() # Execution happens here
```
### Vectorized UDFs (10-100x Faster)
```python
from snowflake.snowpark.functions import pandas_udf
from snowflake.snowpark.types import StringType, PandasSeriesType
import pandas as pd
@pandas_udf(
name="normalize_email",
is_permanent=True,
stage_location="@udf_stage",
replace=True
)
def normalize_email(emails: pd.Series) -> pd.Series:
return emails.str.lower().str.strip()
```
### Stored Procedures in Python
```python
from snowflake.snowpark import Session
def process_batch(session: Session, batch_date: str) -> str:
df = session.table("raw_events").filter(F.col("event_date") == batch_date)
df.write.mode("overwrite").save_as_table("processed_events")
return f"Processed {df.count()} rows for {batch_date}"
session.sproc.register(
func=process_batch,
name="process_batch",
is_permanent=True,
stage_location="@sproc_stage",
replace=True
)
```
### Key Rules
- Never hardcode credentials. Use environment variables, key pair auth, or Snowflake's built-in connection config.
- DataFrames are lazy. Calling `.collect()` pulls all data to the client -- avoid on large datasets.
- Use vectorized UDFs over scalar UDFs for batch processing (10-100x performance improvement).
- Close sessions when done: `session.close()`.

View File

@@ -0,0 +1,281 @@
# Snowflake SQL and Pipelines Reference
Detailed patterns and anti-patterns for Snowflake SQL development and data pipeline design.
## Table of Contents
1. [SQL Patterns](#sql-patterns)
2. [Dynamic Table Deep Dive](#dynamic-table-deep-dive)
3. [Streams and Tasks Patterns](#streams-and-tasks-patterns)
4. [Snowpipe](#snowpipe)
5. [Anti-Patterns](#anti-patterns)
---
## SQL Patterns
### CTE-Based Transformations
```sql
WITH raw AS (
SELECT * FROM raw_events WHERE event_date = CURRENT_DATE()
),
cleaned AS (
SELECT
event_id,
TRIM(LOWER(event_type)) AS event_type,
user_id,
event_timestamp,
src:metadata::VARIANT AS metadata
FROM raw
WHERE event_type IS NOT NULL
),
enriched AS (
SELECT
c.*,
u.name AS user_name,
u.segment
FROM cleaned c
JOIN dim_users u ON c.user_id = u.user_id
)
SELECT * FROM enriched;
```
### MERGE with Multiple Match Conditions
```sql
MERGE INTO dim_customers t
USING (
SELECT customer_id, name, email, updated_at,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY updated_at DESC) AS rn
FROM staging_customers
) s
ON t.customer_id = s.customer_id AND s.rn = 1
WHEN MATCHED AND s.updated_at > t.updated_at THEN
UPDATE SET t.name = s.name, t.email = s.email, t.updated_at = s.updated_at
WHEN NOT MATCHED THEN
INSERT (customer_id, name, email, updated_at)
VALUES (s.customer_id, s.name, s.email, s.updated_at);
```
### Semi-Structured Data Patterns
**Flatten nested arrays:**
```sql
SELECT
o.order_id,
f.value:product_id::STRING AS product_id,
f.value:quantity::NUMBER AS quantity,
f.value:price::NUMBER(10,2) AS price
FROM orders o,
LATERAL FLATTEN(input => o.line_items) f;
```
**Nested flatten (array of arrays):**
```sql
SELECT
f1.value:category::STRING AS category,
f2.value:tag::STRING AS tag
FROM catalog,
LATERAL FLATTEN(input => data:categories) f1,
LATERAL FLATTEN(input => f1.value:tags) f2;
```
**OBJECT_CONSTRUCT for building JSON:**
```sql
SELECT OBJECT_CONSTRUCT(
'id', customer_id,
'name', name,
'orders', ARRAY_AGG(OBJECT_CONSTRUCT('order_id', order_id, 'total', total))
) AS customer_json
FROM customers c JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name;
```
### Window Functions
```sql
-- Running total with partitions
SELECT
department,
employee,
salary,
SUM(salary) OVER (PARTITION BY department ORDER BY hire_date) AS dept_running_total
FROM employees;
-- Detect gaps in sequences
SELECT id, seq_num,
seq_num - LAG(seq_num) OVER (ORDER BY seq_num) AS gap
FROM records
HAVING gap > 1;
```
### Time Travel
```sql
-- Query data as of a specific timestamp
SELECT * FROM my_table AT(TIMESTAMP => '2026-03-20 10:00:00'::TIMESTAMP);
-- Query data before a specific statement
SELECT * FROM my_table BEFORE(STATEMENT => '<query_id>');
-- Restore a dropped table
UNDROP TABLE accidentally_dropped_table;
```
Default retention: 1 day (standard edition), up to 90 days (enterprise+). Set per table: `DATA_RETENTION_TIME_IN_DAYS = 7`.
---
## Dynamic Table Deep Dive
### TARGET_LAG Strategy
Design your DT DAG with progressive lag -- tighter upstream, looser downstream:
```
raw_events (base table)
|
v
cleaned_events (DT, TARGET_LAG = '1 minute')
|
v
enriched_events (DT, TARGET_LAG = '5 minutes')
|
v
daily_aggregates (DT, TARGET_LAG = '1 hour')
```
### Refresh Mode Rules
| Refresh Mode | Condition |
|-------------|-----------|
| Incremental | DTs with simple SELECT, JOIN, WHERE, GROUP BY, UNION ALL on change-tracked sources |
| Full | DTs using non-deterministic functions, LIMIT, or depending on full-refresh DTs |
**Check refresh mode:**
```sql
SELECT name, refresh_mode, refresh_mode_reason
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES())
WHERE name = 'MY_DT';
```
### DT Debugging Queries
```sql
-- Check DT health and lag
SELECT name, scheduling_state, last_completed_refresh_state,
data_timestamp, DATEDIFF('minute', data_timestamp, CURRENT_TIMESTAMP()) AS lag_minutes
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES());
-- Check refresh history for failures
SELECT name, state, state_message, refresh_trigger
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY())
WHERE state = 'FAILED'
ORDER BY refresh_end_time DESC
LIMIT 10;
-- Examine graph dependencies
SELECT name, qualified_name, refresh_mode
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_GRAPH_HISTORY());
```
### DT Constraints
- No views between two DTs in the DAG.
- `SELECT *` breaks on upstream schema changes.
- Cannot use non-deterministic functions (e.g., `CURRENT_TIMESTAMP()`) -- use a column from the source instead.
- Change tracking must be enabled on source tables: `ALTER TABLE src SET CHANGE_TRACKING = TRUE;`
---
## Streams and Tasks Patterns
### Task Trees (Parent-Child)
```sql
CREATE OR REPLACE TASK parent_task
WAREHOUSE = transform_wh
SCHEDULE = 'USING CRON 0 */1 * * * America/Los_Angeles'
AS CALL process_stage_1();
CREATE OR REPLACE TASK child_task
WAREHOUSE = transform_wh
AFTER parent_task
AS CALL process_stage_2();
-- Resume in reverse order: children first, then parent
ALTER TASK child_task RESUME;
ALTER TASK parent_task RESUME;
```
### Stream Types
| Stream Type | Use Case |
|------------|----------|
| Standard (default) | Track all DML changes (INSERT, UPDATE, DELETE) |
| Append-only | Only track INSERTs. More efficient for insert-heavy tables. |
| Insert-only (external tables) | Track new files loaded via external tables. |
```sql
-- Append-only stream for event log tables
CREATE STREAM event_stream ON TABLE events APPEND_ONLY = TRUE;
```
### Serverless Tasks
```sql
-- No warehouse needed. Snowflake manages compute automatically.
CREATE OR REPLACE TASK lightweight_task
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
SCHEDULE = '5 MINUTE'
AS INSERT INTO audit_log SELECT CURRENT_TIMESTAMP(), 'heartbeat';
```
---
## Snowpipe
### Auto-Ingest Setup (S3)
```sql
CREATE OR REPLACE PIPE my_pipe
AUTO_INGEST = TRUE
AS COPY INTO raw_table
FROM @my_s3_stage
FILE_FORMAT = (TYPE = 'JSON', STRIP_NULL_VALUES = TRUE);
```
Configure the S3 event notification to point to the pipe's SQS queue:
```sql
SHOW PIPES LIKE 'my_pipe';
-- Use the notification_channel value for S3 event config
```
### Snowpipe Monitoring
```sql
-- Check pipe status
SELECT SYSTEM$PIPE_STATUS('my_pipe');
-- Recent load history
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'raw_table',
START_TIME => DATEADD(HOUR, -24, CURRENT_TIMESTAMP())
));
```
---
## Anti-Patterns
| Anti-Pattern | Why It's Bad | Fix |
|-------------|-------------|-----|
| `SELECT *` in production | Scans all columns, breaks on schema changes | Explicit column list |
| Double-quoted identifiers | Creates case-sensitive names requiring constant quoting | Use `snake_case` without quotes |
| `ORDER BY` without `LIMIT` | Sorts entire result set for no reason | Add `LIMIT` or remove `ORDER BY` |
| Single warehouse for everything | Workloads compete for resources | Separate warehouses per workload |
| `FLOAT` for money | Rounding errors | `NUMBER(19,4)` or integer cents |
| Missing `RESUME` after task creation | Task never runs | Always `ALTER TASK ... RESUME` |
| `CURRENT_TIMESTAMP()` in DT query | Forces full refresh mode | Use a timestamp column from the source |
| Scanning VARIANT without casting | "Numeric value not recognized" errors | Always cast: `col:field::TYPE` |

View File

@@ -0,0 +1,155 @@
# Snowflake Troubleshooting Reference
Common errors, debugging queries, and resolution patterns for Snowflake development.
## Table of Contents
1. [Error Reference](#error-reference)
2. [Debugging Queries](#debugging-queries)
3. [Performance Diagnostics](#performance-diagnostics)
---
## Error Reference
### SQL Errors
| Error | Cause | Fix |
|-------|-------|-----|
| "Object 'X' does not exist or not authorized" | Wrong database/schema context, missing grants, or typo | Fully qualify: `db.schema.table`. Check `SHOW GRANTS ON TABLE`. |
| "Invalid identifier 'VAR'" in procedure | Missing colon prefix on variable in SQL procedure | Use `:var_name` inside SELECT/INSERT/UPDATE/DELETE/MERGE |
| "Numeric value 'X' is not recognized" | VARIANT field accessed without type cast | Always cast: `src:field::NUMBER(10,2)` |
| "SQL compilation error: ambiguous column name" | Same column name in multiple joined tables | Use table aliases: `t.id`, `s.id` |
| "Number of columns in insert does not match" | INSERT column count mismatch with VALUES | Verify column list matches value list exactly |
| "Division by zero" | Dividing by a column that contains 0 | Use `NULLIF(divisor, 0)` or `IFF(divisor = 0, NULL, ...)` |
### Pipeline Errors
| Error | Cause | Fix |
|-------|-------|-----|
| Task not running | Created but not resumed | `ALTER TASK task_name RESUME;` |
| DT stuck in FAILED state | Query error or upstream dependency issue | Check `DYNAMIC_TABLE_REFRESH_HISTORY()` for error messages |
| DT shows full refresh instead of incremental | Non-deterministic function or unsupported pattern | Check `refresh_mode_reason` in `INFORMATION_SCHEMA.DYNAMIC_TABLES()` |
| Stream shows no data | Stream was consumed or table was recreated | Verify stream is on the correct table, check `STALE_AFTER` |
| Snowpipe not loading files | SQS notification misconfigured or file format mismatch | Check `SYSTEM$PIPE_STATUS()`, verify notification channel |
| "UPSTREAM_FAILED" on DT | A DT dependency upstream has a refresh failure | Fix the upstream DT first, then downstream will recover |
### Cortex AI Errors
| Error | Cause | Fix |
|-------|-------|-----|
| "Function X does not exist" | Using deprecated function name | Use new `AI_*` names (e.g., `AI_CLASSIFY` not `CLASSIFY_TEXT`) |
| TO_FILE error | Single argument instead of two | `TO_FILE('@stage', 'file.pdf')` -- two separate arguments |
| Agent returns empty or wrong results | Poor tool descriptions or wrong semantic model | Improve tool descriptions, verify semantic model covers the question |
| "Invalid specification" on agent | JSON structure error in spec | Check: `models` is object not array, `tool_resources` is top-level, no trailing commas |
---
## Debugging Queries
### Query History
```sql
-- Find slow queries in the last 24 hours
SELECT query_id, query_text, execution_status,
total_elapsed_time / 1000 AS elapsed_sec,
bytes_scanned / (1024*1024*1024) AS gb_scanned,
rows_produced, warehouse_name
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY(
END_TIME_RANGE_START => DATEADD(HOUR, -24, CURRENT_TIMESTAMP()),
RESULT_LIMIT => 50
))
WHERE total_elapsed_time > 30000 -- > 30 seconds
ORDER BY total_elapsed_time DESC;
```
### Dynamic Table Health
```sql
-- Overall DT status
SELECT name, scheduling_state, last_completed_refresh_state,
data_timestamp,
DATEDIFF('minute', data_timestamp, CURRENT_TIMESTAMP()) AS lag_minutes
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES())
ORDER BY lag_minutes DESC;
-- Recent failures
SELECT name, state, state_message, refresh_trigger,
DATEDIFF('second', refresh_start_time, refresh_end_time) AS duration_sec
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY())
WHERE state = 'FAILED'
ORDER BY refresh_end_time DESC
LIMIT 20;
```
### Stream Status
```sql
-- Check stream freshness
SHOW STREAMS;
-- Check if stream has data
SELECT SYSTEM$STREAM_HAS_DATA('my_stream');
```
### Task Monitoring
```sql
-- Check task run history
SELECT name, state, error_message,
scheduled_time, completed_time,
DATEDIFF('second', scheduled_time, completed_time) AS duration_sec
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE name = 'MY_TASK'
ORDER BY scheduled_time DESC
LIMIT 20;
```
### Grants Debugging
```sql
-- What grants does a role have?
SHOW GRANTS TO ROLE my_role;
-- What grants exist on an object?
SHOW GRANTS ON TABLE my_db.my_schema.my_table;
-- Who has ACCOUNTADMIN?
SHOW GRANTS OF ROLE ACCOUNTADMIN;
```
---
## Performance Diagnostics
### Warehouse Utilization
```sql
-- Warehouse load over time
SELECT start_time, warehouse_name,
avg_running, avg_queued_load, avg_blocked
FROM TABLE(INFORMATION_SCHEMA.WAREHOUSE_LOAD_HISTORY(
DATE_RANGE_START => DATEADD(HOUR, -24, CURRENT_TIMESTAMP())
))
WHERE warehouse_name = 'MY_WH'
ORDER BY start_time DESC;
```
### Clustering Health
```sql
-- Check clustering depth (lower is better)
SELECT SYSTEM$CLUSTERING_INFORMATION('my_table', '(date_col, region)');
```
### Storage Costs
```sql
-- Table storage usage
SELECT table_name, active_bytes / (1024*1024*1024) AS active_gb,
time_travel_bytes / (1024*1024*1024) AS time_travel_gb,
failsafe_bytes / (1024*1024*1024) AS failsafe_gb
FROM INFORMATION_SCHEMA.TABLE_STORAGE_METRICS
WHERE table_schema = 'MY_SCHEMA'
ORDER BY active_bytes DESC;
```