Files
claude-skills-reference/engineering-team/snowflake-development/references/snowflake_sql_and_pipelines.md
Reza Rezvani 0e97512a42 feat(engineering-team): add snowflake-development skill
Snowflake SQL, data pipelines (Dynamic Tables, Streams+Tasks), Cortex AI,
Snowpark Python, dbt integration. Includes 3 practical workflows, 9
anti-patterns, cross-references, and troubleshooting guide.

- SKILL.md: 294 lines (colon-prefix rule, MERGE, DTs, Cortex AI, Snowpark)
- Script: snowflake_query_helper.py (MERGE, DT, RBAC generators)
- References: 3 files (SQL patterns, Cortex AI/agents, troubleshooting)

Based on PR #416 by James Cha-Earley — enhanced with practical workflows,
anti-patterns section, cross-references, and normalized frontmatter.

Co-Authored-By: James Cha-Earley <jamescha-earley@users.noreply.github.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 09:38:57 +01:00

7.4 KiB

Snowflake SQL and Pipelines Reference

Detailed patterns and anti-patterns for Snowflake SQL development and data pipeline design.

Table of Contents

  1. SQL Patterns
  2. Dynamic Table Deep Dive
  3. Streams and Tasks Patterns
  4. Snowpipe
  5. Anti-Patterns

SQL Patterns

CTE-Based Transformations

WITH raw AS (
    SELECT * FROM raw_events WHERE event_date = CURRENT_DATE()
),
cleaned AS (
    SELECT
        event_id,
        TRIM(LOWER(event_type)) AS event_type,
        user_id,
        event_timestamp,
        src:metadata::VARIANT AS metadata
    FROM raw
    WHERE event_type IS NOT NULL
),
enriched AS (
    SELECT
        c.*,
        u.name AS user_name,
        u.segment
    FROM cleaned c
    JOIN dim_users u ON c.user_id = u.user_id
)
SELECT * FROM enriched;

MERGE with Multiple Match Conditions

MERGE INTO dim_customers t
USING (
    SELECT customer_id, name, email, updated_at,
           ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY updated_at DESC) AS rn
    FROM staging_customers
) s
ON t.customer_id = s.customer_id AND s.rn = 1
WHEN MATCHED AND s.updated_at > t.updated_at THEN
    UPDATE SET t.name = s.name, t.email = s.email, t.updated_at = s.updated_at
WHEN NOT MATCHED THEN
    INSERT (customer_id, name, email, updated_at)
    VALUES (s.customer_id, s.name, s.email, s.updated_at);

Semi-Structured Data Patterns

Flatten nested arrays:

SELECT
    o.order_id,
    f.value:product_id::STRING AS product_id,
    f.value:quantity::NUMBER AS quantity,
    f.value:price::NUMBER(10,2) AS price
FROM orders o,
LATERAL FLATTEN(input => o.line_items) f;

Nested flatten (array of arrays):

SELECT
    f1.value:category::STRING AS category,
    f2.value:tag::STRING AS tag
FROM catalog,
LATERAL FLATTEN(input => data:categories) f1,
LATERAL FLATTEN(input => f1.value:tags) f2;

OBJECT_CONSTRUCT for building JSON:

SELECT OBJECT_CONSTRUCT(
    'id', customer_id,
    'name', name,
    'orders', ARRAY_AGG(OBJECT_CONSTRUCT('order_id', order_id, 'total', total))
) AS customer_json
FROM customers c JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name;

Window Functions

-- Running total with partitions
SELECT
    department,
    employee,
    salary,
    SUM(salary) OVER (PARTITION BY department ORDER BY hire_date) AS dept_running_total
FROM employees;

-- Detect gaps in sequences
SELECT id, seq_num,
    seq_num - LAG(seq_num) OVER (ORDER BY seq_num) AS gap
FROM records
HAVING gap > 1;

Time Travel

-- Query data as of a specific timestamp
SELECT * FROM my_table AT(TIMESTAMP => '2026-03-20 10:00:00'::TIMESTAMP);

-- Query data before a specific statement
SELECT * FROM my_table BEFORE(STATEMENT => '<query_id>');

-- Restore a dropped table
UNDROP TABLE accidentally_dropped_table;

Default retention: 1 day (standard edition), up to 90 days (enterprise+). Set per table: DATA_RETENTION_TIME_IN_DAYS = 7.


Dynamic Table Deep Dive

TARGET_LAG Strategy

Design your DT DAG with progressive lag -- tighter upstream, looser downstream:

raw_events (base table)
    |
    v
cleaned_events (DT, TARGET_LAG = '1 minute')
    |
    v
enriched_events (DT, TARGET_LAG = '5 minutes')
    |
    v
daily_aggregates (DT, TARGET_LAG = '1 hour')

Refresh Mode Rules

Refresh Mode Condition
Incremental DTs with simple SELECT, JOIN, WHERE, GROUP BY, UNION ALL on change-tracked sources
Full DTs using non-deterministic functions, LIMIT, or depending on full-refresh DTs

Check refresh mode:

SELECT name, refresh_mode, refresh_mode_reason
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES())
WHERE name = 'MY_DT';

DT Debugging Queries

-- Check DT health and lag
SELECT name, scheduling_state, last_completed_refresh_state,
       data_timestamp, DATEDIFF('minute', data_timestamp, CURRENT_TIMESTAMP()) AS lag_minutes
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES());

-- Check refresh history for failures
SELECT name, state, state_message, refresh_trigger
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY())
WHERE state = 'FAILED'
ORDER BY refresh_end_time DESC
LIMIT 10;

-- Examine graph dependencies
SELECT name, qualified_name, refresh_mode
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_GRAPH_HISTORY());

DT Constraints

  • No views between two DTs in the DAG.
  • SELECT * breaks on upstream schema changes.
  • Cannot use non-deterministic functions (e.g., CURRENT_TIMESTAMP()) -- use a column from the source instead.
  • Change tracking must be enabled on source tables: ALTER TABLE src SET CHANGE_TRACKING = TRUE;

Streams and Tasks Patterns

Task Trees (Parent-Child)

CREATE OR REPLACE TASK parent_task
    WAREHOUSE = transform_wh
    SCHEDULE = 'USING CRON 0 */1 * * * America/Los_Angeles'
    AS CALL process_stage_1();

CREATE OR REPLACE TASK child_task
    WAREHOUSE = transform_wh
    AFTER parent_task
    AS CALL process_stage_2();

-- Resume in reverse order: children first, then parent
ALTER TASK child_task RESUME;
ALTER TASK parent_task RESUME;

Stream Types

Stream Type Use Case
Standard (default) Track all DML changes (INSERT, UPDATE, DELETE)
Append-only Only track INSERTs. More efficient for insert-heavy tables.
Insert-only (external tables) Track new files loaded via external tables.
-- Append-only stream for event log tables
CREATE STREAM event_stream ON TABLE events APPEND_ONLY = TRUE;

Serverless Tasks

-- No warehouse needed. Snowflake manages compute automatically.
CREATE OR REPLACE TASK lightweight_task
    USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
    SCHEDULE = '5 MINUTE'
    AS INSERT INTO audit_log SELECT CURRENT_TIMESTAMP(), 'heartbeat';

Snowpipe

Auto-Ingest Setup (S3)

CREATE OR REPLACE PIPE my_pipe
    AUTO_INGEST = TRUE
    AS COPY INTO raw_table
    FROM @my_s3_stage
    FILE_FORMAT = (TYPE = 'JSON', STRIP_NULL_VALUES = TRUE);

Configure the S3 event notification to point to the pipe's SQS queue:

SHOW PIPES LIKE 'my_pipe';
-- Use the notification_channel value for S3 event config

Snowpipe Monitoring

-- Check pipe status
SELECT SYSTEM$PIPE_STATUS('my_pipe');

-- Recent load history
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
    TABLE_NAME => 'raw_table',
    START_TIME => DATEADD(HOUR, -24, CURRENT_TIMESTAMP())
));

Anti-Patterns

Anti-Pattern Why It's Bad Fix
SELECT * in production Scans all columns, breaks on schema changes Explicit column list
Double-quoted identifiers Creates case-sensitive names requiring constant quoting Use snake_case without quotes
ORDER BY without LIMIT Sorts entire result set for no reason Add LIMIT or remove ORDER BY
Single warehouse for everything Workloads compete for resources Separate warehouses per workload
FLOAT for money Rounding errors NUMBER(19,4) or integer cents
Missing RESUME after task creation Task never runs Always ALTER TASK ... RESUME
CURRENT_TIMESTAMP() in DT query Forces full refresh mode Use a timestamp column from the source
Scanning VARIANT without casting "Numeric value not recognized" errors Always cast: col:field::TYPE