From 0e97512a429bfd968db433ea5306323c187dfffe Mon Sep 17 00:00:00 2001 From: Reza Rezvani Date: Thu, 26 Mar 2026 09:38:57 +0100 Subject: [PATCH] feat(engineering-team): add snowflake-development skill MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Snowflake SQL, data pipelines (Dynamic Tables, Streams+Tasks), Cortex AI, Snowpark Python, dbt integration. Includes 3 practical workflows, 9 anti-patterns, cross-references, and troubleshooting guide. - SKILL.md: 294 lines (colon-prefix rule, MERGE, DTs, Cortex AI, Snowpark) - Script: snowflake_query_helper.py (MERGE, DT, RBAC generators) - References: 3 files (SQL patterns, Cortex AI/agents, troubleshooting) Based on PR #416 by James Cha-Earley — enhanced with practical workflows, anti-patterns section, cross-references, and normalized frontmatter. Co-Authored-By: James Cha-Earley Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/skills/engineering-team/index.md | 10 +- .../engineering-team/snowflake-development.md | 305 ++++++++++++++++++ .../snowflake-development/SKILL.md | 294 +++++++++++++++++ .../references/cortex_ai_and_agents.md | 280 ++++++++++++++++ .../references/snowflake_sql_and_pipelines.md | 281 ++++++++++++++++ .../references/troubleshooting.md | 155 +++++++++ .../scripts/snowflake_query_helper.py | 233 +++++++++++++ mkdocs.yml | 1 + 8 files changed, 1557 insertions(+), 2 deletions(-) create mode 100644 docs/skills/engineering-team/snowflake-development.md create mode 100644 engineering-team/snowflake-development/SKILL.md create mode 100644 engineering-team/snowflake-development/references/cortex_ai_and_agents.md create mode 100644 engineering-team/snowflake-development/references/snowflake_sql_and_pipelines.md create mode 100644 engineering-team/snowflake-development/references/troubleshooting.md create mode 100644 engineering-team/snowflake-development/scripts/snowflake_query_helper.py diff --git a/docs/skills/engineering-team/index.md b/docs/skills/engineering-team/index.md index de06022..17a7fcc 100644 --- a/docs/skills/engineering-team/index.md +++ b/docs/skills/engineering-team/index.md @@ -1,13 +1,13 @@ --- title: "Engineering - Core Skills — Agent Skills & Codex Plugins" -description: "44 engineering - core skills — engineering agent skill and Claude Code plugin for code generation, DevOps, architecture, and testing. Works with Claude Code, Codex CLI, Gemini CLI, and OpenClaw." +description: "45 engineering - core skills — engineering agent skill and Claude Code plugin for code generation, DevOps, architecture, and testing. Works with Claude Code, Codex CLI, Gemini CLI, and OpenClaw." ---
# :material-code-braces: Engineering - Core -

44 skills in this domain

+

45 skills in this domain

@@ -179,6 +179,12 @@ description: "44 engineering - core skills — engineering agent skill and Claud Security engineering tools for threat modeling, vulnerability analysis, secure architecture design, and penetration t... +- **[Snowflake Development](snowflake-development.md)** + + --- + + Snowflake SQL, data pipelines, Cortex AI, and Snowpark Python development. Covers the colon-prefix rule, semi-structu... + - **[Stripe Integration Expert](stripe-integration-expert.md)** --- diff --git a/docs/skills/engineering-team/snowflake-development.md b/docs/skills/engineering-team/snowflake-development.md new file mode 100644 index 0000000..390e865 --- /dev/null +++ b/docs/skills/engineering-team/snowflake-development.md @@ -0,0 +1,305 @@ +--- +title: "Snowflake Development — Agent Skill & Codex Plugin" +description: "Use when writing Snowflake SQL, building data pipelines with Dynamic Tables or Streams/Tasks, using Cortex AI functions, creating Cortex Agents. Agent skill for Claude Code, Codex CLI, Gemini CLI, OpenClaw." +--- + +# Snowflake Development + +
+:material-code-braces: Engineering - Core +:material-identifier: `snowflake-development` +:material-github: Source +
+ +
+Install: claude /plugin install engineering-skills +
+ + +Snowflake SQL, data pipelines, Cortex AI, and Snowpark Python development. Covers the colon-prefix rule, semi-structured data, MERGE upserts, Dynamic Tables, Streams+Tasks, Cortex AI functions, agent specs, performance tuning, and security hardening. + +> Originally contributed by [James Cha-Earley](https://github.com/jamescha-earley) — enhanced and integrated by the claude-skills team. + +## Quick Start + +```bash +# Generate a MERGE upsert template +python scripts/snowflake_query_helper.py merge --target customers --source staging_customers --key customer_id --columns name,email,updated_at + +# Generate a Dynamic Table template +python scripts/snowflake_query_helper.py dynamic-table --name cleaned_events --warehouse transform_wh --lag "5 minutes" + +# Generate RBAC grant statements +python scripts/snowflake_query_helper.py grant --role analyst_role --database analytics --schemas public,staging --privileges SELECT,USAGE +``` + +--- + +## SQL Best Practices + +### Naming and Style + +- Use `snake_case` for all identifiers. Avoid double-quoted identifiers -- they force case-sensitive names that require constant quoting. +- Use CTEs (`WITH` clauses) over nested subqueries. +- Use `CREATE OR REPLACE` for idempotent DDL. +- Use explicit column lists -- never `SELECT *` in production. Snowflake's columnar storage scans only referenced columns, so explicit lists reduce I/O. + +### Stored Procedures -- Colon Prefix Rule + +In SQL stored procedures (BEGIN...END blocks), variables and parameters **must** use the colon `:` prefix inside SQL statements. Without it, Snowflake treats them as column identifiers and raises "invalid identifier" errors. + +```sql +-- WRONG: missing colon prefix +SELECT name INTO result FROM users WHERE id = p_id; + +-- CORRECT: colon prefix on both variable and parameter +SELECT name INTO :result FROM users WHERE id = :p_id; +``` + +This applies to DECLARE variables, LET variables, and procedure parameters when used inside SELECT, INSERT, UPDATE, DELETE, or MERGE. + +### Semi-Structured Data + +- VARIANT, OBJECT, ARRAY for JSON/Avro/Parquet/ORC. +- Access nested fields: `src:customer.name::STRING`. Always cast with `::TYPE`. +- VARIANT null vs SQL NULL: JSON `null` is stored as the string `"null"`. Use `STRIP_NULL_VALUE = TRUE` on load. +- Flatten arrays: `SELECT f.value:name::STRING FROM my_table, LATERAL FLATTEN(input => src:items) f;` + +### MERGE for Upserts + +```sql +MERGE INTO target t USING source s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET t.name = s.name, t.updated_at = CURRENT_TIMESTAMP() +WHEN NOT MATCHED THEN INSERT (id, name, updated_at) VALUES (s.id, s.name, CURRENT_TIMESTAMP()); +``` + +> See `references/snowflake_sql_and_pipelines.md` for deeper SQL patterns and anti-patterns. + +--- + +## Data Pipelines + +### Choosing Your Approach + +| Approach | When to Use | +|----------|-------------| +| Dynamic Tables | Declarative transformations. **Default choice.** Define the query, Snowflake handles refresh. | +| Streams + Tasks | Imperative CDC. Use for procedural logic, stored procedure calls, complex branching. | +| Snowpipe | Continuous file loading from cloud storage (S3, GCS, Azure). | + +### Dynamic Tables + +```sql +CREATE OR REPLACE DYNAMIC TABLE cleaned_events + TARGET_LAG = '5 minutes' + WAREHOUSE = transform_wh + AS + SELECT event_id, event_type, user_id, event_timestamp + FROM raw_events + WHERE event_type IS NOT NULL; +``` + +Key rules: +- Set `TARGET_LAG` progressively: tighter at the top of the DAG, looser downstream. +- Incremental DTs cannot depend on Full-refresh DTs. +- `SELECT *` breaks on upstream schema changes -- use explicit column lists. +- Views cannot sit between two Dynamic Tables in the DAG. + +### Streams and Tasks + +```sql +CREATE OR REPLACE STREAM raw_stream ON TABLE raw_events; + +CREATE OR REPLACE TASK process_events + WAREHOUSE = transform_wh + SCHEDULE = 'USING CRON 0 */1 * * * America/Los_Angeles' + WHEN SYSTEM$STREAM_HAS_DATA('raw_stream') + AS INSERT INTO cleaned_events SELECT ... FROM raw_stream; + +-- Tasks start SUSPENDED. You MUST resume them. +ALTER TASK process_events RESUME; +``` + +> See `references/snowflake_sql_and_pipelines.md` for DT debugging queries and Snowpipe patterns. + +--- + +## Cortex AI + +### Function Reference + +| Function | Purpose | +|----------|---------| +| `AI_COMPLETE` | LLM completion (text, images, documents) | +| `AI_CLASSIFY` | Classify text into categories (up to 500 labels) | +| `AI_FILTER` | Boolean filter on text or images | +| `AI_EXTRACT` | Structured extraction from text/images/documents | +| `AI_SENTIMENT` | Sentiment score (-1 to 1) | +| `AI_PARSE_DOCUMENT` | OCR or layout extraction from documents | +| `AI_REDACT` | PII removal from text | + +**Deprecated names (do NOT use):** `COMPLETE`, `CLASSIFY_TEXT`, `EXTRACT_ANSWER`, `PARSE_DOCUMENT`, `SUMMARIZE`, `TRANSLATE`, `SENTIMENT`, `EMBED_TEXT_768`. + +### TO_FILE -- Common Pitfall + +Stage path and filename are **separate** arguments: + +```sql +-- WRONG: single combined argument +TO_FILE('@stage/file.pdf') + +-- CORRECT: two arguments +TO_FILE('@db.schema.mystage', 'invoice.pdf') +``` + +### Cortex Agents + +Agent specs use a JSON structure with top-level keys: `models`, `instructions`, `tools`, `tool_resources`. + +- Use `$spec$` delimiter (not `$$`). +- `models` must be an object, not an array. +- `tool_resources` is a separate top-level key, not nested inside `tools`. +- Tool descriptions are the single biggest factor in agent quality. + +> See `references/cortex_ai_and_agents.md` for full agent spec examples and Cortex Search patterns. + +--- + +## Snowpark Python + +```python +from snowflake.snowpark import Session +import os + +session = Session.builder.configs({ + "account": os.environ["SNOWFLAKE_ACCOUNT"], + "user": os.environ["SNOWFLAKE_USER"], + "password": os.environ["SNOWFLAKE_PASSWORD"], + "role": "my_role", "warehouse": "my_wh", + "database": "my_db", "schema": "my_schema" +}).create() +``` + +- Never hardcode credentials. Use environment variables or key pair auth. +- DataFrames are lazy -- executed on `collect()` / `show()`. +- Do NOT call `collect()` on large DataFrames. Process server-side with DataFrame operations. +- Use **vectorized UDFs** (10-100x faster) for batch and ML workloads. + +## dbt on Snowflake + +```sql +-- Dynamic table materialization (streaming/near-real-time marts): +{{ config(materialized='dynamic_table', snowflake_warehouse='transforming', target_lag='1 hour') }} + +-- Incremental materialization (large fact tables): +{{ config(materialized='incremental', unique_key='event_id') }} + +-- Snowflake-specific configs (combine with any materialization): +{{ config(transient=true, copy_grants=true, query_tag='team_daily') }} +``` + +- Do NOT use `{{ this }}` without `{% if is_incremental() %}` guard. +- Use `dynamic_table` materialization for streaming or near-real-time marts. + +## Performance + +- **Cluster keys**: Only for multi-TB tables. Apply on WHERE / JOIN / GROUP BY columns. +- **Search Optimization**: `ALTER TABLE t ADD SEARCH OPTIMIZATION ON EQUALITY(col);` +- **Warehouse sizing**: Start X-Small, scale up. Set `AUTO_SUSPEND = 60`, `AUTO_RESUME = TRUE`. +- **Separate warehouses** per workload (load, transform, query). + +## Security + +- Follow least-privilege RBAC. Use database roles for object-level grants. +- Audit ACCOUNTADMIN regularly: `SHOW GRANTS OF ROLE ACCOUNTADMIN;` +- Use network policies for IP allowlisting. +- Use masking policies for PII columns and row access policies for multi-tenant isolation. + +--- + +## Proactive Triggers + +Surface these issues without being asked when you notice them in context: + +- **Missing colon prefix** in SQL stored procedures -- flag immediately, this causes "invalid identifier" at runtime. +- **`SELECT *` in Dynamic Tables** -- flag as a schema-change time bomb. +- **Deprecated Cortex function names** (`CLASSIFY_TEXT`, `SUMMARIZE`, etc.) -- suggest the current `AI_*` equivalents. +- **Task not resumed** after creation -- remind that tasks start SUSPENDED. +- **Hardcoded credentials** in Snowpark code -- flag as a security risk. + +--- + +## Common Errors + +| Error | Cause | Fix | +|-------|-------|-----| +| "Object does not exist" | Wrong database/schema context or missing grants | Fully qualify names (`db.schema.table`), check grants | +| "Invalid identifier" in procedure | Missing colon prefix on variable | Use `:variable_name` inside SQL statements | +| "Numeric value not recognized" | VARIANT field not cast | Cast explicitly: `src:field::NUMBER(10,2)` | +| Task not running | Forgot to resume after creation | `ALTER TASK task_name RESUME;` | +| DT refresh failing | Schema change upstream or tracking disabled | Use explicit columns, verify change tracking | +| TO_FILE error | Combined path as single argument | Split into two args: `TO_FILE('@stage', 'file.pdf')` | + +--- + +## Practical Workflows + +### Workflow 1: Build a Reporting Pipeline (30 min) + +1. **Stage raw data**: Create external stage pointing to S3/GCS/Azure, set up Snowpipe for auto-ingest +2. **Clean with Dynamic Table**: Create DT with `TARGET_LAG = '5 minutes'` that filters nulls, casts types, deduplicates +3. **Aggregate with downstream DT**: Second DT that joins cleaned data with dimension tables, computes metrics +4. **Expose via Secure View**: Create `SECURE VIEW` for the BI tool / API layer +5. **Grant access**: Use `snowflake_query_helper.py grant` to generate RBAC statements + +### Workflow 2: Add AI Classification to Existing Data + +1. **Identify the column**: Find the text column to classify (e.g., support tickets, reviews) +2. **Test with AI_CLASSIFY**: `SELECT AI_CLASSIFY(text_col, ['bug', 'feature', 'question']) FROM table LIMIT 10;` +3. **Create enrichment DT**: Dynamic Table that runs `AI_CLASSIFY` on new rows automatically +4. **Monitor costs**: Cortex AI is billed per token — sample before running on full tables + +### Workflow 3: Debug a Failing Pipeline + +1. **Check task history**: `SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY()) WHERE STATE = 'FAILED' ORDER BY SCHEDULED_TIME DESC;` +2. **Check DT refresh**: `SELECT * FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY('my_dt')) ORDER BY REFRESH_END_TIME DESC;` +3. **Check stream staleness**: `SHOW STREAMS; -- check stale_after column` +4. **Consult troubleshooting reference**: See `references/troubleshooting.md` for error-specific fixes + +--- + +## Anti-Patterns + +| Anti-Pattern | Why It Fails | Better Approach | +|---|---|---| +| `SELECT *` in Dynamic Tables | Schema changes upstream break the DT silently | Use explicit column lists | +| Missing colon prefix in procedures | "Invalid identifier" runtime error | Always use `:variable_name` in SQL blocks | +| Single warehouse for all workloads | Contention between load, transform, and query | Separate warehouses per workload type | +| Hardcoded credentials in Snowpark | Security risk, breaks in CI/CD | Use `os.environ[]` or key pair auth | +| `collect()` on large DataFrames | Pulls entire result set to client memory | Process server-side with DataFrame operations | +| Nested subqueries instead of CTEs | Unreadable, hard to debug, Snowflake optimizes CTEs better | Use `WITH` clauses | +| Using deprecated Cortex functions | `CLASSIFY_TEXT`, `SUMMARIZE` etc. will be removed | Use `AI_CLASSIFY`, `AI_COMPLETE` etc. | +| Tasks without `WHEN SYSTEM$STREAM_HAS_DATA` | Task runs on schedule even with no new data, wasting credits | Add the WHEN clause for stream-driven tasks | +| Double-quoted identifiers | Forces case-sensitive names across all queries | Use `snake_case` unquoted identifiers | + +--- + +## Cross-References + +| Skill | Relationship | +|-------|-------------| +| `engineering/sql-database-assistant` | General SQL patterns — use for non-Snowflake databases | +| `engineering/database-designer` | Schema design — use for data modeling before Snowflake implementation | +| `engineering-team/senior-data-engineer` | Broader data engineering — pipelines, Spark, Airflow, data quality | +| `engineering-team/senior-data-scientist` | Analytics and ML — use alongside Snowpark for feature engineering | +| `engineering-team/senior-devops` | CI/CD for Snowflake deployments (Terraform, GitHub Actions) | + +--- + +## Reference Documentation + +| Document | Contents | +|----------|----------| +| `references/snowflake_sql_and_pipelines.md` | SQL patterns, MERGE templates, Dynamic Table debugging, Snowpipe, anti-patterns | +| `references/cortex_ai_and_agents.md` | Cortex AI functions, agent spec structure, Cortex Search, Snowpark | +| `references/troubleshooting.md` | Error reference, debugging queries, common fixes | diff --git a/engineering-team/snowflake-development/SKILL.md b/engineering-team/snowflake-development/SKILL.md new file mode 100644 index 0000000..9ff1694 --- /dev/null +++ b/engineering-team/snowflake-development/SKILL.md @@ -0,0 +1,294 @@ +--- +name: "snowflake-development" +description: "Use when writing Snowflake SQL, building data pipelines with Dynamic Tables or Streams/Tasks, using Cortex AI functions, creating Cortex Agents, writing Snowpark Python, configuring dbt for Snowflake, or troubleshooting Snowflake errors." +--- + +# Snowflake Development + +Snowflake SQL, data pipelines, Cortex AI, and Snowpark Python development. Covers the colon-prefix rule, semi-structured data, MERGE upserts, Dynamic Tables, Streams+Tasks, Cortex AI functions, agent specs, performance tuning, and security hardening. + +> Originally contributed by [James Cha-Earley](https://github.com/jamescha-earley) — enhanced and integrated by the claude-skills team. + +## Quick Start + +```bash +# Generate a MERGE upsert template +python scripts/snowflake_query_helper.py merge --target customers --source staging_customers --key customer_id --columns name,email,updated_at + +# Generate a Dynamic Table template +python scripts/snowflake_query_helper.py dynamic-table --name cleaned_events --warehouse transform_wh --lag "5 minutes" + +# Generate RBAC grant statements +python scripts/snowflake_query_helper.py grant --role analyst_role --database analytics --schemas public,staging --privileges SELECT,USAGE +``` + +--- + +## SQL Best Practices + +### Naming and Style + +- Use `snake_case` for all identifiers. Avoid double-quoted identifiers -- they force case-sensitive names that require constant quoting. +- Use CTEs (`WITH` clauses) over nested subqueries. +- Use `CREATE OR REPLACE` for idempotent DDL. +- Use explicit column lists -- never `SELECT *` in production. Snowflake's columnar storage scans only referenced columns, so explicit lists reduce I/O. + +### Stored Procedures -- Colon Prefix Rule + +In SQL stored procedures (BEGIN...END blocks), variables and parameters **must** use the colon `:` prefix inside SQL statements. Without it, Snowflake treats them as column identifiers and raises "invalid identifier" errors. + +```sql +-- WRONG: missing colon prefix +SELECT name INTO result FROM users WHERE id = p_id; + +-- CORRECT: colon prefix on both variable and parameter +SELECT name INTO :result FROM users WHERE id = :p_id; +``` + +This applies to DECLARE variables, LET variables, and procedure parameters when used inside SELECT, INSERT, UPDATE, DELETE, or MERGE. + +### Semi-Structured Data + +- VARIANT, OBJECT, ARRAY for JSON/Avro/Parquet/ORC. +- Access nested fields: `src:customer.name::STRING`. Always cast with `::TYPE`. +- VARIANT null vs SQL NULL: JSON `null` is stored as the string `"null"`. Use `STRIP_NULL_VALUE = TRUE` on load. +- Flatten arrays: `SELECT f.value:name::STRING FROM my_table, LATERAL FLATTEN(input => src:items) f;` + +### MERGE for Upserts + +```sql +MERGE INTO target t USING source s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET t.name = s.name, t.updated_at = CURRENT_TIMESTAMP() +WHEN NOT MATCHED THEN INSERT (id, name, updated_at) VALUES (s.id, s.name, CURRENT_TIMESTAMP()); +``` + +> See `references/snowflake_sql_and_pipelines.md` for deeper SQL patterns and anti-patterns. + +--- + +## Data Pipelines + +### Choosing Your Approach + +| Approach | When to Use | +|----------|-------------| +| Dynamic Tables | Declarative transformations. **Default choice.** Define the query, Snowflake handles refresh. | +| Streams + Tasks | Imperative CDC. Use for procedural logic, stored procedure calls, complex branching. | +| Snowpipe | Continuous file loading from cloud storage (S3, GCS, Azure). | + +### Dynamic Tables + +```sql +CREATE OR REPLACE DYNAMIC TABLE cleaned_events + TARGET_LAG = '5 minutes' + WAREHOUSE = transform_wh + AS + SELECT event_id, event_type, user_id, event_timestamp + FROM raw_events + WHERE event_type IS NOT NULL; +``` + +Key rules: +- Set `TARGET_LAG` progressively: tighter at the top of the DAG, looser downstream. +- Incremental DTs cannot depend on Full-refresh DTs. +- `SELECT *` breaks on upstream schema changes -- use explicit column lists. +- Views cannot sit between two Dynamic Tables in the DAG. + +### Streams and Tasks + +```sql +CREATE OR REPLACE STREAM raw_stream ON TABLE raw_events; + +CREATE OR REPLACE TASK process_events + WAREHOUSE = transform_wh + SCHEDULE = 'USING CRON 0 */1 * * * America/Los_Angeles' + WHEN SYSTEM$STREAM_HAS_DATA('raw_stream') + AS INSERT INTO cleaned_events SELECT ... FROM raw_stream; + +-- Tasks start SUSPENDED. You MUST resume them. +ALTER TASK process_events RESUME; +``` + +> See `references/snowflake_sql_and_pipelines.md` for DT debugging queries and Snowpipe patterns. + +--- + +## Cortex AI + +### Function Reference + +| Function | Purpose | +|----------|---------| +| `AI_COMPLETE` | LLM completion (text, images, documents) | +| `AI_CLASSIFY` | Classify text into categories (up to 500 labels) | +| `AI_FILTER` | Boolean filter on text or images | +| `AI_EXTRACT` | Structured extraction from text/images/documents | +| `AI_SENTIMENT` | Sentiment score (-1 to 1) | +| `AI_PARSE_DOCUMENT` | OCR or layout extraction from documents | +| `AI_REDACT` | PII removal from text | + +**Deprecated names (do NOT use):** `COMPLETE`, `CLASSIFY_TEXT`, `EXTRACT_ANSWER`, `PARSE_DOCUMENT`, `SUMMARIZE`, `TRANSLATE`, `SENTIMENT`, `EMBED_TEXT_768`. + +### TO_FILE -- Common Pitfall + +Stage path and filename are **separate** arguments: + +```sql +-- WRONG: single combined argument +TO_FILE('@stage/file.pdf') + +-- CORRECT: two arguments +TO_FILE('@db.schema.mystage', 'invoice.pdf') +``` + +### Cortex Agents + +Agent specs use a JSON structure with top-level keys: `models`, `instructions`, `tools`, `tool_resources`. + +- Use `$spec$` delimiter (not `$$`). +- `models` must be an object, not an array. +- `tool_resources` is a separate top-level key, not nested inside `tools`. +- Tool descriptions are the single biggest factor in agent quality. + +> See `references/cortex_ai_and_agents.md` for full agent spec examples and Cortex Search patterns. + +--- + +## Snowpark Python + +```python +from snowflake.snowpark import Session +import os + +session = Session.builder.configs({ + "account": os.environ["SNOWFLAKE_ACCOUNT"], + "user": os.environ["SNOWFLAKE_USER"], + "password": os.environ["SNOWFLAKE_PASSWORD"], + "role": "my_role", "warehouse": "my_wh", + "database": "my_db", "schema": "my_schema" +}).create() +``` + +- Never hardcode credentials. Use environment variables or key pair auth. +- DataFrames are lazy -- executed on `collect()` / `show()`. +- Do NOT call `collect()` on large DataFrames. Process server-side with DataFrame operations. +- Use **vectorized UDFs** (10-100x faster) for batch and ML workloads. + +## dbt on Snowflake + +```sql +-- Dynamic table materialization (streaming/near-real-time marts): +{{ config(materialized='dynamic_table', snowflake_warehouse='transforming', target_lag='1 hour') }} + +-- Incremental materialization (large fact tables): +{{ config(materialized='incremental', unique_key='event_id') }} + +-- Snowflake-specific configs (combine with any materialization): +{{ config(transient=true, copy_grants=true, query_tag='team_daily') }} +``` + +- Do NOT use `{{ this }}` without `{% if is_incremental() %}` guard. +- Use `dynamic_table` materialization for streaming or near-real-time marts. + +## Performance + +- **Cluster keys**: Only for multi-TB tables. Apply on WHERE / JOIN / GROUP BY columns. +- **Search Optimization**: `ALTER TABLE t ADD SEARCH OPTIMIZATION ON EQUALITY(col);` +- **Warehouse sizing**: Start X-Small, scale up. Set `AUTO_SUSPEND = 60`, `AUTO_RESUME = TRUE`. +- **Separate warehouses** per workload (load, transform, query). + +## Security + +- Follow least-privilege RBAC. Use database roles for object-level grants. +- Audit ACCOUNTADMIN regularly: `SHOW GRANTS OF ROLE ACCOUNTADMIN;` +- Use network policies for IP allowlisting. +- Use masking policies for PII columns and row access policies for multi-tenant isolation. + +--- + +## Proactive Triggers + +Surface these issues without being asked when you notice them in context: + +- **Missing colon prefix** in SQL stored procedures -- flag immediately, this causes "invalid identifier" at runtime. +- **`SELECT *` in Dynamic Tables** -- flag as a schema-change time bomb. +- **Deprecated Cortex function names** (`CLASSIFY_TEXT`, `SUMMARIZE`, etc.) -- suggest the current `AI_*` equivalents. +- **Task not resumed** after creation -- remind that tasks start SUSPENDED. +- **Hardcoded credentials** in Snowpark code -- flag as a security risk. + +--- + +## Common Errors + +| Error | Cause | Fix | +|-------|-------|-----| +| "Object does not exist" | Wrong database/schema context or missing grants | Fully qualify names (`db.schema.table`), check grants | +| "Invalid identifier" in procedure | Missing colon prefix on variable | Use `:variable_name` inside SQL statements | +| "Numeric value not recognized" | VARIANT field not cast | Cast explicitly: `src:field::NUMBER(10,2)` | +| Task not running | Forgot to resume after creation | `ALTER TASK task_name RESUME;` | +| DT refresh failing | Schema change upstream or tracking disabled | Use explicit columns, verify change tracking | +| TO_FILE error | Combined path as single argument | Split into two args: `TO_FILE('@stage', 'file.pdf')` | + +--- + +## Practical Workflows + +### Workflow 1: Build a Reporting Pipeline (30 min) + +1. **Stage raw data**: Create external stage pointing to S3/GCS/Azure, set up Snowpipe for auto-ingest +2. **Clean with Dynamic Table**: Create DT with `TARGET_LAG = '5 minutes'` that filters nulls, casts types, deduplicates +3. **Aggregate with downstream DT**: Second DT that joins cleaned data with dimension tables, computes metrics +4. **Expose via Secure View**: Create `SECURE VIEW` for the BI tool / API layer +5. **Grant access**: Use `snowflake_query_helper.py grant` to generate RBAC statements + +### Workflow 2: Add AI Classification to Existing Data + +1. **Identify the column**: Find the text column to classify (e.g., support tickets, reviews) +2. **Test with AI_CLASSIFY**: `SELECT AI_CLASSIFY(text_col, ['bug', 'feature', 'question']) FROM table LIMIT 10;` +3. **Create enrichment DT**: Dynamic Table that runs `AI_CLASSIFY` on new rows automatically +4. **Monitor costs**: Cortex AI is billed per token — sample before running on full tables + +### Workflow 3: Debug a Failing Pipeline + +1. **Check task history**: `SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY()) WHERE STATE = 'FAILED' ORDER BY SCHEDULED_TIME DESC;` +2. **Check DT refresh**: `SELECT * FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY('my_dt')) ORDER BY REFRESH_END_TIME DESC;` +3. **Check stream staleness**: `SHOW STREAMS; -- check stale_after column` +4. **Consult troubleshooting reference**: See `references/troubleshooting.md` for error-specific fixes + +--- + +## Anti-Patterns + +| Anti-Pattern | Why It Fails | Better Approach | +|---|---|---| +| `SELECT *` in Dynamic Tables | Schema changes upstream break the DT silently | Use explicit column lists | +| Missing colon prefix in procedures | "Invalid identifier" runtime error | Always use `:variable_name` in SQL blocks | +| Single warehouse for all workloads | Contention between load, transform, and query | Separate warehouses per workload type | +| Hardcoded credentials in Snowpark | Security risk, breaks in CI/CD | Use `os.environ[]` or key pair auth | +| `collect()` on large DataFrames | Pulls entire result set to client memory | Process server-side with DataFrame operations | +| Nested subqueries instead of CTEs | Unreadable, hard to debug, Snowflake optimizes CTEs better | Use `WITH` clauses | +| Using deprecated Cortex functions | `CLASSIFY_TEXT`, `SUMMARIZE` etc. will be removed | Use `AI_CLASSIFY`, `AI_COMPLETE` etc. | +| Tasks without `WHEN SYSTEM$STREAM_HAS_DATA` | Task runs on schedule even with no new data, wasting credits | Add the WHEN clause for stream-driven tasks | +| Double-quoted identifiers | Forces case-sensitive names across all queries | Use `snake_case` unquoted identifiers | + +--- + +## Cross-References + +| Skill | Relationship | +|-------|-------------| +| `engineering/sql-database-assistant` | General SQL patterns — use for non-Snowflake databases | +| `engineering/database-designer` | Schema design — use for data modeling before Snowflake implementation | +| `engineering-team/senior-data-engineer` | Broader data engineering — pipelines, Spark, Airflow, data quality | +| `engineering-team/senior-data-scientist` | Analytics and ML — use alongside Snowpark for feature engineering | +| `engineering-team/senior-devops` | CI/CD for Snowflake deployments (Terraform, GitHub Actions) | + +--- + +## Reference Documentation + +| Document | Contents | +|----------|----------| +| `references/snowflake_sql_and_pipelines.md` | SQL patterns, MERGE templates, Dynamic Table debugging, Snowpipe, anti-patterns | +| `references/cortex_ai_and_agents.md` | Cortex AI functions, agent spec structure, Cortex Search, Snowpark | +| `references/troubleshooting.md` | Error reference, debugging queries, common fixes | diff --git a/engineering-team/snowflake-development/references/cortex_ai_and_agents.md b/engineering-team/snowflake-development/references/cortex_ai_and_agents.md new file mode 100644 index 0000000..d53a14a --- /dev/null +++ b/engineering-team/snowflake-development/references/cortex_ai_and_agents.md @@ -0,0 +1,280 @@ +# Cortex AI and Agents Reference + +Complete reference for Snowflake Cortex AI functions, Cortex Agents, Cortex Search, and Snowpark Python patterns. + +## Table of Contents + +1. [Cortex AI Functions](#cortex-ai-functions) +2. [Cortex Agents](#cortex-agents) +3. [Cortex Search](#cortex-search) +4. [Snowpark Python](#snowpark-python) + +--- + +## Cortex AI Functions + +### Complete Function Reference + +| Function | Signature | Returns | +|----------|-----------|---------| +| `AI_COMPLETE` | `AI_COMPLETE(model, prompt)` or `AI_COMPLETE(model, conversation, options)` | STRING or OBJECT | +| `AI_CLASSIFY` | `AI_CLASSIFY(input, categories)` | OBJECT with `labels` array | +| `AI_EXTRACT` | `AI_EXTRACT(input, fields)` | OBJECT with extracted fields | +| `AI_FILTER` | `AI_FILTER(input, condition)` | BOOLEAN | +| `AI_SENTIMENT` | `AI_SENTIMENT(text)` | FLOAT (-1 to 1) | +| `AI_SUMMARIZE` | `AI_SUMMARIZE(text)` | STRING | +| `AI_TRANSLATE` | `AI_TRANSLATE(text, source_lang, target_lang)` | STRING | +| `AI_PARSE_DOCUMENT` | `AI_PARSE_DOCUMENT(file, options)` | OBJECT | +| `AI_REDACT` | `AI_REDACT(text)` | STRING | +| `AI_EMBED` | `AI_EMBED(model, text)` | ARRAY (vector) | +| `AI_AGG` | `AI_AGG(column, instruction)` | STRING | + +### Deprecated Function Mapping + +| Old Name (Do NOT Use) | New Name | +|-----------------------|----------| +| `COMPLETE` | `AI_COMPLETE` | +| `CLASSIFY_TEXT` | `AI_CLASSIFY` | +| `EXTRACT_ANSWER` | `AI_EXTRACT` | +| `SUMMARIZE` | `AI_SUMMARIZE` | +| `TRANSLATE` | `AI_TRANSLATE` | +| `SENTIMENT` | `AI_SENTIMENT` | +| `EMBED_TEXT_768` | `AI_EMBED` | + +### AI_COMPLETE Patterns + +**Simple completion:** +```sql +SELECT AI_COMPLETE('claude-4-sonnet', 'Summarize this text: ' || article_text) AS summary +FROM articles; +``` + +**With system prompt (conversation format):** +```sql +SELECT AI_COMPLETE( + 'claude-4-sonnet', + [ + {'role': 'system', 'content': 'You are a data quality analyst. Be concise.'}, + {'role': 'user', 'content': 'Analyze this record: ' || record::STRING} + ] +) AS analysis +FROM flagged_records; +``` + +**With document input (TO_FILE):** +```sql +SELECT AI_COMPLETE( + 'claude-4-sonnet', + 'Extract the invoice total from this document', + TO_FILE('@docs_stage', 'invoice.pdf') +) AS invoice_total; +``` + +### AI_CLASSIFY Patterns + +Use AI_CLASSIFY instead of AI_COMPLETE for classification tasks -- it is purpose-built, cheaper, and returns structured output. + +```sql +SELECT + ticket_text, + AI_CLASSIFY(ticket_text, ['billing', 'technical', 'account', 'feature_request']):labels[0]::VARCHAR AS category +FROM support_tickets; +``` + +### AI_EXTRACT Patterns + +```sql +SELECT + AI_EXTRACT(email_body, ['sender_name', 'action_requested', 'deadline'])::OBJECT AS extracted +FROM emails; +``` + +### Cost Awareness + +Estimate token costs before running AI functions on large tables: + +```sql +-- Count tokens first +SELECT + COUNT(*) AS row_count, + SUM(AI_COUNT_TOKENS('claude-4-sonnet', text_column)) AS total_tokens +FROM my_table; + +-- Process a sample first +SELECT AI_COMPLETE('claude-4-sonnet', text_column) FROM my_table SAMPLE (100 ROWS); +``` + +--- + +## Cortex Agents + +### Agent Spec Structure + +```sql +CREATE OR REPLACE AGENT my_db.my_schema.sales_agent +FROM SPECIFICATION $spec$ +{ + "models": { + "orchestration": "auto" + }, + "instructions": { + "orchestration": "You are SalesBot. Help users query sales data.", + "response": "Be concise. Use tables for numeric data." + }, + "tools": [ + { + "tool_spec": { + "type": "cortex_analyst_text_to_sql", + "name": "SalesQuery", + "description": "Query sales metrics including revenue, orders, and customer data. Use for questions about sales performance, trends, and comparisons." + } + }, + { + "tool_spec": { + "type": "cortex_search", + "name": "PolicySearch", + "description": "Search company sales policies and procedures." + } + } + ], + "tool_resources": { + "SalesQuery": { + "semantic_model_file": "@my_db.my_schema.models/sales_model.yaml" + }, + "PolicySearch": { + "cortex_search_service": "my_db.my_schema.policy_search_service" + } + } +} +$spec$; +``` + +### Agent Rules + +- **Delimiter**: Use `$spec$` not `$$` to avoid conflicts with SQL dollar-quoting. +- **models**: Must be an object (`{"orchestration": "auto"}`), not an array. +- **tool_resources**: A separate top-level key, not nested inside individual tool entries. +- **Empty values in edit specs**: Do NOT include `null` or empty string values when editing -- they clear existing values. +- **Tool descriptions**: The single biggest quality factor. Be specific about what data each tool accesses and what questions it answers. +- **Testing**: Never modify production agents directly. Clone first, test, then swap. + +### Calling an Agent + +```sql +SELECT SNOWFLAKE.CORTEX.AGENT( + 'my_db.my_schema.sales_agent', + 'What was total revenue last quarter?' +); +``` + +--- + +## Cortex Search + +### Creating a Search Service + +```sql +CREATE OR REPLACE CORTEX SEARCH SERVICE my_db.my_schema.docs_search +ON text_column +ATTRIBUTES category, department +WAREHOUSE = search_wh +TARGET_LAG = '1 hour' +AS ( + SELECT text_column, category, department, doc_id + FROM documents +); +``` + +### Querying a Search Service + +```sql +SELECT PARSE_JSON( + SNOWFLAKE.CORTEX.SEARCH_PREVIEW( + 'my_db.my_schema.docs_search', + '{ + "query": "return policy for electronics", + "columns": ["text_column", "category"], + "filter": {"@eq": {"department": "retail"}}, + "limit": 5 + }' + ) +) AS results; +``` + +--- + +## Snowpark Python + +### Session Setup + +```python +from snowflake.snowpark import Session +import os + +session = Session.builder.configs({ + "account": os.environ["SNOWFLAKE_ACCOUNT"], + "user": os.environ["SNOWFLAKE_USER"], + "password": os.environ["SNOWFLAKE_PASSWORD"], + "role": "my_role", + "warehouse": "my_wh", + "database": "my_db", + "schema": "my_schema" +}).create() +``` + +### DataFrame Operations + +```python +# Lazy operations -- nothing executes until collect()/show() +df = session.table("events") +result = ( + df.filter(df["event_type"] == "purchase") + .group_by("user_id") + .agg(F.sum("amount").alias("total_spent")) + .sort(F.col("total_spent").desc()) +) +result.show() # Execution happens here +``` + +### Vectorized UDFs (10-100x Faster) + +```python +from snowflake.snowpark.functions import pandas_udf +from snowflake.snowpark.types import StringType, PandasSeriesType +import pandas as pd + +@pandas_udf( + name="normalize_email", + is_permanent=True, + stage_location="@udf_stage", + replace=True +) +def normalize_email(emails: pd.Series) -> pd.Series: + return emails.str.lower().str.strip() +``` + +### Stored Procedures in Python + +```python +from snowflake.snowpark import Session + +def process_batch(session: Session, batch_date: str) -> str: + df = session.table("raw_events").filter(F.col("event_date") == batch_date) + df.write.mode("overwrite").save_as_table("processed_events") + return f"Processed {df.count()} rows for {batch_date}" + +session.sproc.register( + func=process_batch, + name="process_batch", + is_permanent=True, + stage_location="@sproc_stage", + replace=True +) +``` + +### Key Rules + +- Never hardcode credentials. Use environment variables, key pair auth, or Snowflake's built-in connection config. +- DataFrames are lazy. Calling `.collect()` pulls all data to the client -- avoid on large datasets. +- Use vectorized UDFs over scalar UDFs for batch processing (10-100x performance improvement). +- Close sessions when done: `session.close()`. diff --git a/engineering-team/snowflake-development/references/snowflake_sql_and_pipelines.md b/engineering-team/snowflake-development/references/snowflake_sql_and_pipelines.md new file mode 100644 index 0000000..efb8bbe --- /dev/null +++ b/engineering-team/snowflake-development/references/snowflake_sql_and_pipelines.md @@ -0,0 +1,281 @@ +# Snowflake SQL and Pipelines Reference + +Detailed patterns and anti-patterns for Snowflake SQL development and data pipeline design. + +## Table of Contents + +1. [SQL Patterns](#sql-patterns) +2. [Dynamic Table Deep Dive](#dynamic-table-deep-dive) +3. [Streams and Tasks Patterns](#streams-and-tasks-patterns) +4. [Snowpipe](#snowpipe) +5. [Anti-Patterns](#anti-patterns) + +--- + +## SQL Patterns + +### CTE-Based Transformations + +```sql +WITH raw AS ( + SELECT * FROM raw_events WHERE event_date = CURRENT_DATE() +), +cleaned AS ( + SELECT + event_id, + TRIM(LOWER(event_type)) AS event_type, + user_id, + event_timestamp, + src:metadata::VARIANT AS metadata + FROM raw + WHERE event_type IS NOT NULL +), +enriched AS ( + SELECT + c.*, + u.name AS user_name, + u.segment + FROM cleaned c + JOIN dim_users u ON c.user_id = u.user_id +) +SELECT * FROM enriched; +``` + +### MERGE with Multiple Match Conditions + +```sql +MERGE INTO dim_customers t +USING ( + SELECT customer_id, name, email, updated_at, + ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY updated_at DESC) AS rn + FROM staging_customers +) s +ON t.customer_id = s.customer_id AND s.rn = 1 +WHEN MATCHED AND s.updated_at > t.updated_at THEN + UPDATE SET t.name = s.name, t.email = s.email, t.updated_at = s.updated_at +WHEN NOT MATCHED THEN + INSERT (customer_id, name, email, updated_at) + VALUES (s.customer_id, s.name, s.email, s.updated_at); +``` + +### Semi-Structured Data Patterns + +**Flatten nested arrays:** +```sql +SELECT + o.order_id, + f.value:product_id::STRING AS product_id, + f.value:quantity::NUMBER AS quantity, + f.value:price::NUMBER(10,2) AS price +FROM orders o, +LATERAL FLATTEN(input => o.line_items) f; +``` + +**Nested flatten (array of arrays):** +```sql +SELECT + f1.value:category::STRING AS category, + f2.value:tag::STRING AS tag +FROM catalog, +LATERAL FLATTEN(input => data:categories) f1, +LATERAL FLATTEN(input => f1.value:tags) f2; +``` + +**OBJECT_CONSTRUCT for building JSON:** +```sql +SELECT OBJECT_CONSTRUCT( + 'id', customer_id, + 'name', name, + 'orders', ARRAY_AGG(OBJECT_CONSTRUCT('order_id', order_id, 'total', total)) +) AS customer_json +FROM customers c JOIN orders o ON c.customer_id = o.customer_id +GROUP BY c.customer_id, c.name; +``` + +### Window Functions + +```sql +-- Running total with partitions +SELECT + department, + employee, + salary, + SUM(salary) OVER (PARTITION BY department ORDER BY hire_date) AS dept_running_total +FROM employees; + +-- Detect gaps in sequences +SELECT id, seq_num, + seq_num - LAG(seq_num) OVER (ORDER BY seq_num) AS gap +FROM records +HAVING gap > 1; +``` + +### Time Travel + +```sql +-- Query data as of a specific timestamp +SELECT * FROM my_table AT(TIMESTAMP => '2026-03-20 10:00:00'::TIMESTAMP); + +-- Query data before a specific statement +SELECT * FROM my_table BEFORE(STATEMENT => ''); + +-- Restore a dropped table +UNDROP TABLE accidentally_dropped_table; +``` + +Default retention: 1 day (standard edition), up to 90 days (enterprise+). Set per table: `DATA_RETENTION_TIME_IN_DAYS = 7`. + +--- + +## Dynamic Table Deep Dive + +### TARGET_LAG Strategy + +Design your DT DAG with progressive lag -- tighter upstream, looser downstream: + +``` +raw_events (base table) + | + v +cleaned_events (DT, TARGET_LAG = '1 minute') + | + v +enriched_events (DT, TARGET_LAG = '5 minutes') + | + v +daily_aggregates (DT, TARGET_LAG = '1 hour') +``` + +### Refresh Mode Rules + +| Refresh Mode | Condition | +|-------------|-----------| +| Incremental | DTs with simple SELECT, JOIN, WHERE, GROUP BY, UNION ALL on change-tracked sources | +| Full | DTs using non-deterministic functions, LIMIT, or depending on full-refresh DTs | + +**Check refresh mode:** +```sql +SELECT name, refresh_mode, refresh_mode_reason +FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES()) +WHERE name = 'MY_DT'; +``` + +### DT Debugging Queries + +```sql +-- Check DT health and lag +SELECT name, scheduling_state, last_completed_refresh_state, + data_timestamp, DATEDIFF('minute', data_timestamp, CURRENT_TIMESTAMP()) AS lag_minutes +FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES()); + +-- Check refresh history for failures +SELECT name, state, state_message, refresh_trigger +FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY()) +WHERE state = 'FAILED' +ORDER BY refresh_end_time DESC +LIMIT 10; + +-- Examine graph dependencies +SELECT name, qualified_name, refresh_mode +FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_GRAPH_HISTORY()); +``` + +### DT Constraints + +- No views between two DTs in the DAG. +- `SELECT *` breaks on upstream schema changes. +- Cannot use non-deterministic functions (e.g., `CURRENT_TIMESTAMP()`) -- use a column from the source instead. +- Change tracking must be enabled on source tables: `ALTER TABLE src SET CHANGE_TRACKING = TRUE;` + +--- + +## Streams and Tasks Patterns + +### Task Trees (Parent-Child) + +```sql +CREATE OR REPLACE TASK parent_task + WAREHOUSE = transform_wh + SCHEDULE = 'USING CRON 0 */1 * * * America/Los_Angeles' + AS CALL process_stage_1(); + +CREATE OR REPLACE TASK child_task + WAREHOUSE = transform_wh + AFTER parent_task + AS CALL process_stage_2(); + +-- Resume in reverse order: children first, then parent +ALTER TASK child_task RESUME; +ALTER TASK parent_task RESUME; +``` + +### Stream Types + +| Stream Type | Use Case | +|------------|----------| +| Standard (default) | Track all DML changes (INSERT, UPDATE, DELETE) | +| Append-only | Only track INSERTs. More efficient for insert-heavy tables. | +| Insert-only (external tables) | Track new files loaded via external tables. | + +```sql +-- Append-only stream for event log tables +CREATE STREAM event_stream ON TABLE events APPEND_ONLY = TRUE; +``` + +### Serverless Tasks + +```sql +-- No warehouse needed. Snowflake manages compute automatically. +CREATE OR REPLACE TASK lightweight_task + USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL' + SCHEDULE = '5 MINUTE' + AS INSERT INTO audit_log SELECT CURRENT_TIMESTAMP(), 'heartbeat'; +``` + +--- + +## Snowpipe + +### Auto-Ingest Setup (S3) + +```sql +CREATE OR REPLACE PIPE my_pipe + AUTO_INGEST = TRUE + AS COPY INTO raw_table + FROM @my_s3_stage + FILE_FORMAT = (TYPE = 'JSON', STRIP_NULL_VALUES = TRUE); +``` + +Configure the S3 event notification to point to the pipe's SQS queue: +```sql +SHOW PIPES LIKE 'my_pipe'; +-- Use the notification_channel value for S3 event config +``` + +### Snowpipe Monitoring + +```sql +-- Check pipe status +SELECT SYSTEM$PIPE_STATUS('my_pipe'); + +-- Recent load history +SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY( + TABLE_NAME => 'raw_table', + START_TIME => DATEADD(HOUR, -24, CURRENT_TIMESTAMP()) +)); +``` + +--- + +## Anti-Patterns + +| Anti-Pattern | Why It's Bad | Fix | +|-------------|-------------|-----| +| `SELECT *` in production | Scans all columns, breaks on schema changes | Explicit column list | +| Double-quoted identifiers | Creates case-sensitive names requiring constant quoting | Use `snake_case` without quotes | +| `ORDER BY` without `LIMIT` | Sorts entire result set for no reason | Add `LIMIT` or remove `ORDER BY` | +| Single warehouse for everything | Workloads compete for resources | Separate warehouses per workload | +| `FLOAT` for money | Rounding errors | `NUMBER(19,4)` or integer cents | +| Missing `RESUME` after task creation | Task never runs | Always `ALTER TASK ... RESUME` | +| `CURRENT_TIMESTAMP()` in DT query | Forces full refresh mode | Use a timestamp column from the source | +| Scanning VARIANT without casting | "Numeric value not recognized" errors | Always cast: `col:field::TYPE` | diff --git a/engineering-team/snowflake-development/references/troubleshooting.md b/engineering-team/snowflake-development/references/troubleshooting.md new file mode 100644 index 0000000..eb05a8b --- /dev/null +++ b/engineering-team/snowflake-development/references/troubleshooting.md @@ -0,0 +1,155 @@ +# Snowflake Troubleshooting Reference + +Common errors, debugging queries, and resolution patterns for Snowflake development. + +## Table of Contents + +1. [Error Reference](#error-reference) +2. [Debugging Queries](#debugging-queries) +3. [Performance Diagnostics](#performance-diagnostics) + +--- + +## Error Reference + +### SQL Errors + +| Error | Cause | Fix | +|-------|-------|-----| +| "Object 'X' does not exist or not authorized" | Wrong database/schema context, missing grants, or typo | Fully qualify: `db.schema.table`. Check `SHOW GRANTS ON TABLE`. | +| "Invalid identifier 'VAR'" in procedure | Missing colon prefix on variable in SQL procedure | Use `:var_name` inside SELECT/INSERT/UPDATE/DELETE/MERGE | +| "Numeric value 'X' is not recognized" | VARIANT field accessed without type cast | Always cast: `src:field::NUMBER(10,2)` | +| "SQL compilation error: ambiguous column name" | Same column name in multiple joined tables | Use table aliases: `t.id`, `s.id` | +| "Number of columns in insert does not match" | INSERT column count mismatch with VALUES | Verify column list matches value list exactly | +| "Division by zero" | Dividing by a column that contains 0 | Use `NULLIF(divisor, 0)` or `IFF(divisor = 0, NULL, ...)` | + +### Pipeline Errors + +| Error | Cause | Fix | +|-------|-------|-----| +| Task not running | Created but not resumed | `ALTER TASK task_name RESUME;` | +| DT stuck in FAILED state | Query error or upstream dependency issue | Check `DYNAMIC_TABLE_REFRESH_HISTORY()` for error messages | +| DT shows full refresh instead of incremental | Non-deterministic function or unsupported pattern | Check `refresh_mode_reason` in `INFORMATION_SCHEMA.DYNAMIC_TABLES()` | +| Stream shows no data | Stream was consumed or table was recreated | Verify stream is on the correct table, check `STALE_AFTER` | +| Snowpipe not loading files | SQS notification misconfigured or file format mismatch | Check `SYSTEM$PIPE_STATUS()`, verify notification channel | +| "UPSTREAM_FAILED" on DT | A DT dependency upstream has a refresh failure | Fix the upstream DT first, then downstream will recover | + +### Cortex AI Errors + +| Error | Cause | Fix | +|-------|-------|-----| +| "Function X does not exist" | Using deprecated function name | Use new `AI_*` names (e.g., `AI_CLASSIFY` not `CLASSIFY_TEXT`) | +| TO_FILE error | Single argument instead of two | `TO_FILE('@stage', 'file.pdf')` -- two separate arguments | +| Agent returns empty or wrong results | Poor tool descriptions or wrong semantic model | Improve tool descriptions, verify semantic model covers the question | +| "Invalid specification" on agent | JSON structure error in spec | Check: `models` is object not array, `tool_resources` is top-level, no trailing commas | + +--- + +## Debugging Queries + +### Query History + +```sql +-- Find slow queries in the last 24 hours +SELECT query_id, query_text, execution_status, + total_elapsed_time / 1000 AS elapsed_sec, + bytes_scanned / (1024*1024*1024) AS gb_scanned, + rows_produced, warehouse_name +FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY( + END_TIME_RANGE_START => DATEADD(HOUR, -24, CURRENT_TIMESTAMP()), + RESULT_LIMIT => 50 +)) +WHERE total_elapsed_time > 30000 -- > 30 seconds +ORDER BY total_elapsed_time DESC; +``` + +### Dynamic Table Health + +```sql +-- Overall DT status +SELECT name, scheduling_state, last_completed_refresh_state, + data_timestamp, + DATEDIFF('minute', data_timestamp, CURRENT_TIMESTAMP()) AS lag_minutes +FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES()) +ORDER BY lag_minutes DESC; + +-- Recent failures +SELECT name, state, state_message, refresh_trigger, + DATEDIFF('second', refresh_start_time, refresh_end_time) AS duration_sec +FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY()) +WHERE state = 'FAILED' +ORDER BY refresh_end_time DESC +LIMIT 20; +``` + +### Stream Status + +```sql +-- Check stream freshness +SHOW STREAMS; + +-- Check if stream has data +SELECT SYSTEM$STREAM_HAS_DATA('my_stream'); +``` + +### Task Monitoring + +```sql +-- Check task run history +SELECT name, state, error_message, + scheduled_time, completed_time, + DATEDIFF('second', scheduled_time, completed_time) AS duration_sec +FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY()) +WHERE name = 'MY_TASK' +ORDER BY scheduled_time DESC +LIMIT 20; +``` + +### Grants Debugging + +```sql +-- What grants does a role have? +SHOW GRANTS TO ROLE my_role; + +-- What grants exist on an object? +SHOW GRANTS ON TABLE my_db.my_schema.my_table; + +-- Who has ACCOUNTADMIN? +SHOW GRANTS OF ROLE ACCOUNTADMIN; +``` + +--- + +## Performance Diagnostics + +### Warehouse Utilization + +```sql +-- Warehouse load over time +SELECT start_time, warehouse_name, + avg_running, avg_queued_load, avg_blocked +FROM TABLE(INFORMATION_SCHEMA.WAREHOUSE_LOAD_HISTORY( + DATE_RANGE_START => DATEADD(HOUR, -24, CURRENT_TIMESTAMP()) +)) +WHERE warehouse_name = 'MY_WH' +ORDER BY start_time DESC; +``` + +### Clustering Health + +```sql +-- Check clustering depth (lower is better) +SELECT SYSTEM$CLUSTERING_INFORMATION('my_table', '(date_col, region)'); +``` + +### Storage Costs + +```sql +-- Table storage usage +SELECT table_name, active_bytes / (1024*1024*1024) AS active_gb, + time_travel_bytes / (1024*1024*1024) AS time_travel_gb, + failsafe_bytes / (1024*1024*1024) AS failsafe_gb +FROM INFORMATION_SCHEMA.TABLE_STORAGE_METRICS +WHERE table_schema = 'MY_SCHEMA' +ORDER BY active_bytes DESC; +``` diff --git a/engineering-team/snowflake-development/scripts/snowflake_query_helper.py b/engineering-team/snowflake-development/scripts/snowflake_query_helper.py new file mode 100644 index 0000000..eabaaa0 --- /dev/null +++ b/engineering-team/snowflake-development/scripts/snowflake_query_helper.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +""" +Snowflake Query Helper + +Generate common Snowflake SQL patterns: MERGE upserts, Dynamic Table DDL, +and RBAC grant statements. Outputs ready-to-use SQL that follows Snowflake +best practices. + +Usage: + python snowflake_query_helper.py merge --target customers --source stg_customers --key id --columns name,email + python snowflake_query_helper.py dynamic-table --name cleaned_events --warehouse transform_wh --lag "5 minutes" + python snowflake_query_helper.py grant --role analyst --database analytics --schemas public --privileges SELECT,USAGE + python snowflake_query_helper.py merge --target t --source s --key id --columns a,b --json +""" + +import argparse +import json +import sys +import textwrap +from typing import List, Optional + + +def generate_merge( + target: str, + source: str, + key: str, + columns: List[str], + schema: Optional[str] = None, +) -> str: + """Generate a MERGE (upsert) statement following Snowflake best practices.""" + prefix = f"{schema}." if schema else "" + t = f"{prefix}{target}" + s = f"{prefix}{source}" + + # Filter out updated_at from user columns to avoid duplicates + merge_cols = [col for col in columns if col != "updated_at"] + + update_sets = ",\n ".join( + f"t.{col} = s.{col}" for col in merge_cols + ) + update_sets += ",\n t.updated_at = CURRENT_TIMESTAMP()" + + insert_cols = ", ".join([key] + merge_cols + ["updated_at"]) + insert_vals = ", ".join( + [f"s.{key}"] + [f"s.{col}" for col in merge_cols] + ["CURRENT_TIMESTAMP()"] + ) + + return textwrap.dedent(f"""\ + MERGE INTO {t} t + USING {s} s + ON t.{key} = s.{key} + WHEN MATCHED THEN + UPDATE SET + {update_sets} + WHEN NOT MATCHED THEN + INSERT ({insert_cols}) + VALUES ({insert_vals});""") + + +def generate_dynamic_table( + name: str, + warehouse: str, + lag: str, + source: Optional[str] = None, + columns: Optional[List[str]] = None, + schema: Optional[str] = None, +) -> str: + """Generate a Dynamic Table DDL with best-practice defaults.""" + prefix = f"{schema}." if schema else "" + full_name = f"{prefix}{name}" + src = source or "" + col_list = ", ".join(columns) if columns else ", , " + + return textwrap.dedent(f"""\ + CREATE OR REPLACE DYNAMIC TABLE {full_name} + TARGET_LAG = '{lag}' + WAREHOUSE = {warehouse} + AS + SELECT {col_list} + FROM {src} + WHERE 1=1; -- Add your filter conditions + + -- Verify refresh mode (incremental is preferred): + -- SELECT name, refresh_mode, refresh_mode_reason + -- FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES()) + -- WHERE name = '{name.upper()}';""") + + +def generate_grants( + role: str, + database: str, + schemas: List[str], + privileges: List[str], +) -> str: + """Generate RBAC grant statements following least-privilege principles.""" + lines = [f"-- RBAC grants for role: {role}"] + lines.append(f"-- Generated following least-privilege principles") + lines.append("") + + # Database-level + lines.append(f"GRANT USAGE ON DATABASE {database} TO ROLE {role};") + lines.append("") + + for schema in schemas: + fq_schema = f"{database}.{schema}" + lines.append(f"-- Schema: {fq_schema}") + lines.append(f"GRANT USAGE ON SCHEMA {fq_schema} TO ROLE {role};") + + for priv in privileges: + p = priv.strip().upper() + if p == "USAGE": + continue # Already granted above + elif p == "SELECT": + lines.append( + f"GRANT SELECT ON ALL TABLES IN SCHEMA {fq_schema} TO ROLE {role};" + ) + lines.append( + f"GRANT SELECT ON FUTURE TABLES IN SCHEMA {fq_schema} TO ROLE {role};" + ) + lines.append( + f"GRANT SELECT ON ALL VIEWS IN SCHEMA {fq_schema} TO ROLE {role};" + ) + lines.append( + f"GRANT SELECT ON FUTURE VIEWS IN SCHEMA {fq_schema} TO ROLE {role};" + ) + elif p in ("INSERT", "UPDATE", "DELETE", "TRUNCATE"): + lines.append( + f"GRANT {p} ON ALL TABLES IN SCHEMA {fq_schema} TO ROLE {role};" + ) + lines.append( + f"GRANT {p} ON FUTURE TABLES IN SCHEMA {fq_schema} TO ROLE {role};" + ) + elif p == "CREATE TABLE": + lines.append( + f"GRANT CREATE TABLE ON SCHEMA {fq_schema} TO ROLE {role};" + ) + elif p == "CREATE VIEW": + lines.append( + f"GRANT CREATE VIEW ON SCHEMA {fq_schema} TO ROLE {role};" + ) + else: + lines.append( + f"GRANT {p} ON SCHEMA {fq_schema} TO ROLE {role};" + ) + lines.append("") + + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser( + description="Generate common Snowflake SQL patterns", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent("""\ + Examples: + %(prog)s merge --target customers --source stg --key id --columns name,email + %(prog)s dynamic-table --name clean_events --warehouse wh --lag "5 min" + %(prog)s grant --role analyst --database db --schemas public --privileges SELECT + """), + ) + parser.add_argument( + "--json", action="store_true", help="Output as JSON instead of raw SQL" + ) + + subparsers = parser.add_subparsers(dest="command", help="SQL pattern to generate") + + # MERGE subcommand + merge_p = subparsers.add_parser("merge", help="Generate MERGE (upsert) statement") + merge_p.add_argument("--target", required=True, help="Target table name") + merge_p.add_argument("--source", required=True, help="Source table name") + merge_p.add_argument("--key", required=True, help="Join key column") + merge_p.add_argument( + "--columns", required=True, help="Comma-separated columns to merge" + ) + merge_p.add_argument("--schema", help="Schema prefix (e.g., my_db.my_schema)") + + # Dynamic Table subcommand + dt_p = subparsers.add_parser( + "dynamic-table", help="Generate Dynamic Table DDL" + ) + dt_p.add_argument("--name", required=True, help="Dynamic Table name") + dt_p.add_argument("--warehouse", required=True, help="Warehouse for refresh") + dt_p.add_argument( + "--lag", required=True, help="Target lag (e.g., '5 minutes', '1 hour')" + ) + dt_p.add_argument("--source", help="Source table name") + dt_p.add_argument("--columns", help="Comma-separated column list") + dt_p.add_argument("--schema", help="Schema prefix") + + # Grant subcommand + grant_p = subparsers.add_parser("grant", help="Generate RBAC grant statements") + grant_p.add_argument("--role", required=True, help="Role to grant to") + grant_p.add_argument("--database", required=True, help="Database name") + grant_p.add_argument( + "--schemas", required=True, help="Comma-separated schema names" + ) + grant_p.add_argument( + "--privileges", + required=True, + help="Comma-separated privileges (SELECT, INSERT, UPDATE, DELETE, CREATE TABLE, etc.)", + ) + + args = parser.parse_args() + + if not args.command: + parser.print_help() + sys.exit(1) + + if args.command == "merge": + cols = [c.strip() for c in args.columns.split(",")] + sql = generate_merge(args.target, args.source, args.key, cols, args.schema) + elif args.command == "dynamic-table": + cols = [c.strip() for c in args.columns.split(",")] if args.columns else None + sql = generate_dynamic_table( + args.name, args.warehouse, args.lag, args.source, cols, args.schema + ) + elif args.command == "grant": + schemas = [s.strip() for s in args.schemas.split(",")] + privs = [p.strip() for p in args.privileges.split(",")] + sql = generate_grants(args.role, args.database, schemas, privs) + else: + parser.print_help() + sys.exit(1) + + if args.json: + output = {"command": args.command, "sql": sql} + print(json.dumps(output, indent=2)) + else: + print(sql) + + +if __name__ == "__main__": + main() diff --git a/mkdocs.yml b/mkdocs.yml index 983c164..5c73f30 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -161,6 +161,7 @@ nav: - "Senior SecOps Engineer": skills/engineering-team/senior-secops.md - "Senior Security Engineer": skills/engineering-team/senior-security.md - "Security Pen Testing": skills/engineering-team/security-pen-testing.md + - "Snowflake Development": skills/engineering-team/snowflake-development.md - "Stripe Integration Expert": skills/engineering-team/stripe-integration-expert.md - "TDD Guide": skills/engineering-team/tdd-guide.md - "Tech Stack Evaluator": skills/engineering-team/tech-stack-evaluator.md