Snowflake SQL, data pipelines (Dynamic Tables, Streams+Tasks), Cortex AI, Snowpark Python, dbt integration. Includes 3 practical workflows, 9 anti-patterns, cross-references, and troubleshooting guide. - SKILL.md: 294 lines (colon-prefix rule, MERGE, DTs, Cortex AI, Snowpark) - Script: snowflake_query_helper.py (MERGE, DT, RBAC generators) - References: 3 files (SQL patterns, Cortex AI/agents, troubleshooting) Based on PR #416 by James Cha-Earley — enhanced with practical workflows, anti-patterns section, cross-references, and normalized frontmatter. Co-Authored-By: James Cha-Earley <jamescha-earley@users.noreply.github.com> Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
282 lines
7.4 KiB
Markdown
282 lines
7.4 KiB
Markdown
# Snowflake SQL and Pipelines Reference
|
|
|
|
Detailed patterns and anti-patterns for Snowflake SQL development and data pipeline design.
|
|
|
|
## Table of Contents
|
|
|
|
1. [SQL Patterns](#sql-patterns)
|
|
2. [Dynamic Table Deep Dive](#dynamic-table-deep-dive)
|
|
3. [Streams and Tasks Patterns](#streams-and-tasks-patterns)
|
|
4. [Snowpipe](#snowpipe)
|
|
5. [Anti-Patterns](#anti-patterns)
|
|
|
|
---
|
|
|
|
## SQL Patterns
|
|
|
|
### CTE-Based Transformations
|
|
|
|
```sql
|
|
WITH raw AS (
|
|
SELECT * FROM raw_events WHERE event_date = CURRENT_DATE()
|
|
),
|
|
cleaned AS (
|
|
SELECT
|
|
event_id,
|
|
TRIM(LOWER(event_type)) AS event_type,
|
|
user_id,
|
|
event_timestamp,
|
|
src:metadata::VARIANT AS metadata
|
|
FROM raw
|
|
WHERE event_type IS NOT NULL
|
|
),
|
|
enriched AS (
|
|
SELECT
|
|
c.*,
|
|
u.name AS user_name,
|
|
u.segment
|
|
FROM cleaned c
|
|
JOIN dim_users u ON c.user_id = u.user_id
|
|
)
|
|
SELECT * FROM enriched;
|
|
```
|
|
|
|
### MERGE with Multiple Match Conditions
|
|
|
|
```sql
|
|
MERGE INTO dim_customers t
|
|
USING (
|
|
SELECT customer_id, name, email, updated_at,
|
|
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY updated_at DESC) AS rn
|
|
FROM staging_customers
|
|
) s
|
|
ON t.customer_id = s.customer_id AND s.rn = 1
|
|
WHEN MATCHED AND s.updated_at > t.updated_at THEN
|
|
UPDATE SET t.name = s.name, t.email = s.email, t.updated_at = s.updated_at
|
|
WHEN NOT MATCHED THEN
|
|
INSERT (customer_id, name, email, updated_at)
|
|
VALUES (s.customer_id, s.name, s.email, s.updated_at);
|
|
```
|
|
|
|
### Semi-Structured Data Patterns
|
|
|
|
**Flatten nested arrays:**
|
|
```sql
|
|
SELECT
|
|
o.order_id,
|
|
f.value:product_id::STRING AS product_id,
|
|
f.value:quantity::NUMBER AS quantity,
|
|
f.value:price::NUMBER(10,2) AS price
|
|
FROM orders o,
|
|
LATERAL FLATTEN(input => o.line_items) f;
|
|
```
|
|
|
|
**Nested flatten (array of arrays):**
|
|
```sql
|
|
SELECT
|
|
f1.value:category::STRING AS category,
|
|
f2.value:tag::STRING AS tag
|
|
FROM catalog,
|
|
LATERAL FLATTEN(input => data:categories) f1,
|
|
LATERAL FLATTEN(input => f1.value:tags) f2;
|
|
```
|
|
|
|
**OBJECT_CONSTRUCT for building JSON:**
|
|
```sql
|
|
SELECT OBJECT_CONSTRUCT(
|
|
'id', customer_id,
|
|
'name', name,
|
|
'orders', ARRAY_AGG(OBJECT_CONSTRUCT('order_id', order_id, 'total', total))
|
|
) AS customer_json
|
|
FROM customers c JOIN orders o ON c.customer_id = o.customer_id
|
|
GROUP BY c.customer_id, c.name;
|
|
```
|
|
|
|
### Window Functions
|
|
|
|
```sql
|
|
-- Running total with partitions
|
|
SELECT
|
|
department,
|
|
employee,
|
|
salary,
|
|
SUM(salary) OVER (PARTITION BY department ORDER BY hire_date) AS dept_running_total
|
|
FROM employees;
|
|
|
|
-- Detect gaps in sequences
|
|
SELECT id, seq_num,
|
|
seq_num - LAG(seq_num) OVER (ORDER BY seq_num) AS gap
|
|
FROM records
|
|
HAVING gap > 1;
|
|
```
|
|
|
|
### Time Travel
|
|
|
|
```sql
|
|
-- Query data as of a specific timestamp
|
|
SELECT * FROM my_table AT(TIMESTAMP => '2026-03-20 10:00:00'::TIMESTAMP);
|
|
|
|
-- Query data before a specific statement
|
|
SELECT * FROM my_table BEFORE(STATEMENT => '<query_id>');
|
|
|
|
-- Restore a dropped table
|
|
UNDROP TABLE accidentally_dropped_table;
|
|
```
|
|
|
|
Default retention: 1 day (standard edition), up to 90 days (enterprise+). Set per table: `DATA_RETENTION_TIME_IN_DAYS = 7`.
|
|
|
|
---
|
|
|
|
## Dynamic Table Deep Dive
|
|
|
|
### TARGET_LAG Strategy
|
|
|
|
Design your DT DAG with progressive lag -- tighter upstream, looser downstream:
|
|
|
|
```
|
|
raw_events (base table)
|
|
|
|
|
v
|
|
cleaned_events (DT, TARGET_LAG = '1 minute')
|
|
|
|
|
v
|
|
enriched_events (DT, TARGET_LAG = '5 minutes')
|
|
|
|
|
v
|
|
daily_aggregates (DT, TARGET_LAG = '1 hour')
|
|
```
|
|
|
|
### Refresh Mode Rules
|
|
|
|
| Refresh Mode | Condition |
|
|
|-------------|-----------|
|
|
| Incremental | DTs with simple SELECT, JOIN, WHERE, GROUP BY, UNION ALL on change-tracked sources |
|
|
| Full | DTs using non-deterministic functions, LIMIT, or depending on full-refresh DTs |
|
|
|
|
**Check refresh mode:**
|
|
```sql
|
|
SELECT name, refresh_mode, refresh_mode_reason
|
|
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES())
|
|
WHERE name = 'MY_DT';
|
|
```
|
|
|
|
### DT Debugging Queries
|
|
|
|
```sql
|
|
-- Check DT health and lag
|
|
SELECT name, scheduling_state, last_completed_refresh_state,
|
|
data_timestamp, DATEDIFF('minute', data_timestamp, CURRENT_TIMESTAMP()) AS lag_minutes
|
|
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES());
|
|
|
|
-- Check refresh history for failures
|
|
SELECT name, state, state_message, refresh_trigger
|
|
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY())
|
|
WHERE state = 'FAILED'
|
|
ORDER BY refresh_end_time DESC
|
|
LIMIT 10;
|
|
|
|
-- Examine graph dependencies
|
|
SELECT name, qualified_name, refresh_mode
|
|
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_GRAPH_HISTORY());
|
|
```
|
|
|
|
### DT Constraints
|
|
|
|
- No views between two DTs in the DAG.
|
|
- `SELECT *` breaks on upstream schema changes.
|
|
- Cannot use non-deterministic functions (e.g., `CURRENT_TIMESTAMP()`) -- use a column from the source instead.
|
|
- Change tracking must be enabled on source tables: `ALTER TABLE src SET CHANGE_TRACKING = TRUE;`
|
|
|
|
---
|
|
|
|
## Streams and Tasks Patterns
|
|
|
|
### Task Trees (Parent-Child)
|
|
|
|
```sql
|
|
CREATE OR REPLACE TASK parent_task
|
|
WAREHOUSE = transform_wh
|
|
SCHEDULE = 'USING CRON 0 */1 * * * America/Los_Angeles'
|
|
AS CALL process_stage_1();
|
|
|
|
CREATE OR REPLACE TASK child_task
|
|
WAREHOUSE = transform_wh
|
|
AFTER parent_task
|
|
AS CALL process_stage_2();
|
|
|
|
-- Resume in reverse order: children first, then parent
|
|
ALTER TASK child_task RESUME;
|
|
ALTER TASK parent_task RESUME;
|
|
```
|
|
|
|
### Stream Types
|
|
|
|
| Stream Type | Use Case |
|
|
|------------|----------|
|
|
| Standard (default) | Track all DML changes (INSERT, UPDATE, DELETE) |
|
|
| Append-only | Only track INSERTs. More efficient for insert-heavy tables. |
|
|
| Insert-only (external tables) | Track new files loaded via external tables. |
|
|
|
|
```sql
|
|
-- Append-only stream for event log tables
|
|
CREATE STREAM event_stream ON TABLE events APPEND_ONLY = TRUE;
|
|
```
|
|
|
|
### Serverless Tasks
|
|
|
|
```sql
|
|
-- No warehouse needed. Snowflake manages compute automatically.
|
|
CREATE OR REPLACE TASK lightweight_task
|
|
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
|
|
SCHEDULE = '5 MINUTE'
|
|
AS INSERT INTO audit_log SELECT CURRENT_TIMESTAMP(), 'heartbeat';
|
|
```
|
|
|
|
---
|
|
|
|
## Snowpipe
|
|
|
|
### Auto-Ingest Setup (S3)
|
|
|
|
```sql
|
|
CREATE OR REPLACE PIPE my_pipe
|
|
AUTO_INGEST = TRUE
|
|
AS COPY INTO raw_table
|
|
FROM @my_s3_stage
|
|
FILE_FORMAT = (TYPE = 'JSON', STRIP_NULL_VALUES = TRUE);
|
|
```
|
|
|
|
Configure the S3 event notification to point to the pipe's SQS queue:
|
|
```sql
|
|
SHOW PIPES LIKE 'my_pipe';
|
|
-- Use the notification_channel value for S3 event config
|
|
```
|
|
|
|
### Snowpipe Monitoring
|
|
|
|
```sql
|
|
-- Check pipe status
|
|
SELECT SYSTEM$PIPE_STATUS('my_pipe');
|
|
|
|
-- Recent load history
|
|
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
|
|
TABLE_NAME => 'raw_table',
|
|
START_TIME => DATEADD(HOUR, -24, CURRENT_TIMESTAMP())
|
|
));
|
|
```
|
|
|
|
---
|
|
|
|
## Anti-Patterns
|
|
|
|
| Anti-Pattern | Why It's Bad | Fix |
|
|
|-------------|-------------|-----|
|
|
| `SELECT *` in production | Scans all columns, breaks on schema changes | Explicit column list |
|
|
| Double-quoted identifiers | Creates case-sensitive names requiring constant quoting | Use `snake_case` without quotes |
|
|
| `ORDER BY` without `LIMIT` | Sorts entire result set for no reason | Add `LIMIT` or remove `ORDER BY` |
|
|
| Single warehouse for everything | Workloads compete for resources | Separate warehouses per workload |
|
|
| `FLOAT` for money | Rounding errors | `NUMBER(19,4)` or integer cents |
|
|
| Missing `RESUME` after task creation | Task never runs | Always `ALTER TASK ... RESUME` |
|
|
| `CURRENT_TIMESTAMP()` in DT query | Forces full refresh mode | Use a timestamp column from the source |
|
|
| Scanning VARIANT without casting | "Numeric value not recognized" errors | Always cast: `col:field::TYPE` |
|