Your SQL DNA dbt Fluency
You already know 80% of what dbt does. Let's map your existing mental models to dbt's paradigm — and surgically fill the gaps.
5
Materializations
Jinja Power
DAG
Auto-lineage
SCD2
Snapshots
py
Python Models
Your Tech Profile → dbt Power Level
SQL / CTEs
→ dbt models ARE CTEs
NoSQL / JSON
→ semi-structured sources
Docker
→ dbt Cloud / Airflow
Cloud (AWS/GCP)
→ BigQuery/Redshift targets
Python
→ Python models, macros
dbt Core
→ what we're fixing today
SQL Veteran → dbt Translation
CREATE OR REPLACE VIEW
materialized='view'
CREATE TABLE AS SELECT
materialized='table'
INSERT INTO ... WHERE date > ?
materialized='incremental'
MERGE INTO (SCD Type 2)
snapshot block
SELECT * FROM schema.table
{{ ref('model_name') }}
External / raw table
{{ source('schema','table') }}
Stored procedure / UDF
macro in macros/
Static lookup CSV
seed in seeds/
WITH cte AS (SELECT ...)
ephemeral model
Data contract / expectation
dbt test schema tests

The dbt Project Anatomy

sources

Raw ingested data. You declare these in schema.yml files, not create them. Think: Fivetran landing zone, Airbyte raw tables, Kafka-to-S3 dumps.

models

SQL (or Python) SELECT statements. Each file = one relation in your warehouse. dbt resolves dependencies automatically via ref().

seeds

CSV files checked into git. dbt seed loads them. Perfect for country codes, category maps, static cost tables.

macros

Jinja functions that generate SQL. Reusable logic. Cross-project via packages. Think SQL templating on steroids.

snapshots

Type-2 SCDs, automated. Point at a source, define uniqueness + change detection strategy, dbt handles the valid_from / valid_to bookkeeping.

tests

Assertions on your data. Four built-in generic tests. Unlimited custom SQL tests. Package extensions like dbt_expectations give you 50+ more.

Live DAG — The Ingress Pipeline You're Building

⬡ src_postgres.orders ──▶ stg_orders ──▶ int_orders_enriched ──▶ mart_revenue
⬡ src_kafka.events ──▶ stg_events ──▶ int_orders_enriched
⬡ src_s3.customers ──▶ stg_customers ──▶ int_orders_enriched
✦ seed: country_codes ──▶ stg_customers
mart_revenue ──▶ mart_revenue_daily

Each arrow is a ref() or source() call. dbt resolves execution order and builds the DAG automatically.

Models & Materializations
The five materializations, when to reach for each, and the config patterns that unlock them.
WHEN TO USE Staging models, rarely-queried transformations, alias layers. Zero storage cost. Freshest data always.
models/staging/stg_orders.sql sql + jinja
{{
  config(
    materialized = 'view',
    schema       = 'staging',
    tags         = ['staging', 'orders']
  )
}}

with source as (

  select *
  from {{ source('postgres', 'orders') }}

),

renamed as (

  select
    -- rename & cast at the boundary
    order_id                              as order_id,
    customer_id                           as customer_id,
    cast(created_at as timestamp)        as created_at,
    upper(status)                         as status,
    amount / 100.0                        as amount_dollars,
    _fivetran_synced                      as _loaded_at

  from source

)

select * from renamed
PRO INSIGHT: THE STAGING CONTRACT Staging models are your boundary. They rename, cast, and clean — nothing else. No joins. No business logic. This keeps your raw layer queryable independently and makes upstream schema changes a one-file fix.
models/staging/schema.yml yaml
sources:
  - name: postgres
    database: raw
    schema: public
    loaded_at_field: _fivetran_synced
    freshness:
      warn_after: {count: 1, period: hour}
      error_after: {count: 6, period: hour}
    tables:
      - name: orders
        description: "Raw orders from Postgres via Fivetran"
        columns:
          - name: order_id
            description: "PK — UUID"
            tests:
              - not_null
              - unique

Run dbt source freshness to alert when sources go stale — critical for ingress monitoring.

WHEN TO USE Heavy aggregations, mart-layer models, anything downstream dashboards hammer. Pays compute once, serves reads infinitely.
models/marts/mart_revenue.sql sql + jinja
{{
  config(
    materialized  = 'table',
    schema        = 'marts',
    dist          = 'customer_id',  -- Redshift
    sort          = ['order_date'],  -- Redshift
    -- BigQuery equivalent:
    partition_by  = {
      'field': 'order_date',
      'data_type': 'date',
      'granularity': 'day'
    },
    cluster_by    = ['customer_id', 'status']
  )
}}

with orders as (
  select * from {{ ref('int_orders_enriched') }}
),

daily_revenue as (
  select
    date_trunc('day', created_at) as order_date,
    customer_id,
    status,
    count(*)                     as order_count,
    sum(amount_dollars)           as gross_revenue,
    sum(amount_dollars)
      filter (where status = 'COMPLETED')
                                  as net_revenue
  from orders
  group by 1, 2, 3
)

select * from daily_revenue
WAREHOUSE-NATIVE OPTIMISATION dbt passes partition_by and cluster_by directly to BigQuery DDL. On Redshift, dist and sort control physical layout. Know your warehouse, inject it here.

Table vs View Decision Matrix

Factor→ View→ Table
Query frequencyLow / ad-hocHigh / BI tool
Transform costCheap scansExpensive aggregation
Data freshnessAlways liveAs-of last dbt run
Storage costZeroFull duplication
Downstream joinsRe-compute each timeIndexes / partitions help
CLICK → Incremental section for the full deep-dive This tab covers the basics. The Incremental section in the nav covers merge strategies, late-arriving data, and the --full-refresh escape hatch.
models/staging/stg_events.sql incremental basics
{{
  config(
    materialized       = 'incremental',
    unique_key         = 'event_id',
    on_schema_change   = 'sync_all_columns'
  )
}}

select
  event_id,
  user_id,
  event_type,
  event_ts,
  properties
from {{ source('kafka', 'events') }}

{% if is_incremental() %}

  where event_ts > (select max(event_ts) from {{ this }})

{% endif %}

{{ this }} is the relation reference to the model's own table in the warehouse — the magic that makes incremental work. is_incremental() returns false on first run (full load) and true on subsequent runs.

EPHEMERAL = CTE-AS-A-FILE Ephemeral models are inlined as CTEs into whatever model references them. They create zero warehouse objects — pure compile-time substitution. Think: shared logic extracted into its own file.
models/intermediate/_int_exchange_rates.sql ephemeral
{{ config(materialized='ephemeral') }}

-- This file creates NO table/view in the warehouse.
-- It's inlined as a CTE wherever ref() is called.

select
  currency_code,
  rate_to_usd,
  date_trunc('day', valid_from) as rate_date
from {{ ref('stg_exchange_rates') }}
where is_current = true

What dbt compiles to:

compiled SQL (auto-generated)compiled
-- dbt inlines the ephemeral as a CTE:
with __dbt__cte__int_exchange_rates as (

  select
    currency_code,
    rate_to_usd,
    date_trunc('day', valid_from) as rate_date
  from raw.public.exchange_rates
  where is_current = true

),

your_model_cte as (

  select
    o.amount_dollars * fx.rate_to_usd as amount_usd
  from orders o
  join __dbt__cte__int_exchange_rates fx using (currency_code)

)

select * from your_model_cte
GOTCHA Ephemeral models can't be ref()'d from other ephemeral models more than ~3 levels deep before the CTE stack explodes. Use a view instead when the chain gets long.
dbt 1.6+ — Native Materialized View Support Warehouse-native MVs (BigQuery, Snowflake, Redshift, Postgres 9.3+). Automatically refreshed by the warehouse engine. dbt manages the DDL; you write the SELECT.
models/marts/mv_active_sessions.sql materialized_view
{{
  config(
    materialized  = 'materialized_view',
    -- Snowflake: target_lag controls refresh frequency
    target_lag    = '1 minute',
    snowflake_warehouse = 'TRANSFORMING'
  )
}}

-- The warehouse keeps this fresh automatically.
-- dbt run detects drift and issues ALTER/REPLACE DDL.

select
  session_id,
  user_id,
  max(event_ts)  as last_activity,
  count(*)       as event_count
from {{ ref('stg_events') }}
where event_ts > current_timestamp() - interval '24 hours'
group by 1, 2
Jinja Macros SQL Metaprogramming
Where dbt goes from "SQL runner" to "SQL compiler". Macros are Jinja functions that generate SQL — write the generator once, never write boilerplate again.

Anatomy of a Macro

macros/utils/cents_to_dollars.sql jinja macro
{% macro cents_to_dollars(column_name, scale=2) %}
  round({{ column_name }} / 100.0, {{ scale }})
{% endmacro %}


-- Usage in any model:
select
  order_id,
  {{ cents_to_dollars('amount_cents') }}          as amount,
  {{ cents_to_dollars('tax_cents', scale=4) }}  as tax

-- Compiles to:
select
  order_id,
  round(amount_cents / 100.0, 2) as amount,
  round(tax_cents    / 100.0, 4) as tax

The Real Power: Generating Column Lists

macros/utils/pivot_status.sql advanced macro
{% macro pivot_status_counts(statuses) %}

  {% for status in statuses %}

    countif(status = '{{ status }}') as {{ status | lower }}_count

    {% if not loop.last %},{% endif %}

  {% endfor %}

{% endmacro %}


-- Usage:
select
  customer_id,
  {{ pivot_status_counts([
      'PENDING',
      'COMPLETED',
      'CANCELLED',
      'REFUNDED'
  ]) }}
from {{ ref('stg_orders') }}
group by 1

Introspecting the Warehouse

macros/utils/get_column_names.sql introspection macro
{% macro get_column_names(model) %}

  {% set relation = ref(model) %}
  {% set columns  = adapter.get_columns_in_relation(relation) %}

  {{ log("Columns in " ~ model ~ ":", info=true) }}
  {% for col in columns %}
    {{ log(col.name ~ " (" ~ col.dtype ~ ")", info=true) }}
  {% endfor %}

{% endmacro %}


-- A model that auto-selects all non-PII columns:
{% macro select_non_pii(model, pii_columns) %}

  {% set relation = ref(model) %}
  {% set all_cols  = adapter.get_columns_in_relation(relation) %}

  select
  {% for col in all_cols %}
    {% if col.name not in pii_columns %}
      {{ col.name }}{% if not loop.last %},{% endif %}
    {% endif %}
  {% endfor %}
  from {{ relation }}

{% endmacro %}
ADAPTER METHODS — YOUR SECRET WEAPONS adapter.get_columns_in_relation(), adapter.get_relation(), adapter.already_exists() — these are warehouse-introspection calls inside Jinja. Most senior dbt engineers never use them. You should.

run_query — Execute SQL, Use the Result

macros/utils/dynamic_date_spine.sql run_query
{% macro get_max_date(model, column) %}

  {% set query %}
    select max({{ column }}) as max_date
    from {{ ref(model) }}
  {% endset %}

  {% set result = run_query(query) %}

  {% if execute %}
    {% set max_date = result.columns[0].values()[0] %}
    {{ return(max_date) }}
  {% endif %}

{% endmacro %}

-- Usage: dynamically backfill from last loaded date
{% set cutoff = get_max_date('stg_orders', 'created_at') %}

Cross-Database Dispatch: Write Once, Run Everywhere

macros/utils/safe_divide.sql dispatch macro
-- Base macro — defines the interface
{% macro safe_divide(numerator, denominator) %}
  {{ return(adapter.dispatch('safe_divide')(numerator, denominator)) }}
{% endmacro %}

-- BigQuery implementation
{% macro bigquery__safe_divide(numerator, denominator) %}
  safe_divide({{ numerator }}, {{ denominator }})
{% endmacro %}

-- Snowflake / Postgres fallback
{% macro default__safe_divide(numerator, denominator) %}
  case when {{ denominator }} = 0 then null
       else {{ numerator }} / nullif({{ denominator }}, 0)
  end
{% endmacro %}

-- Usage in any model — dbt picks the right implementation:
select
  {{ safe_divide('net_revenue', 'order_count') }} as avg_order_value
Incremental Models The Full Arsenal
Four strategies, late-arriving data, schema evolution, and the decisions that separate a $100/month pipeline from a $10,000/month one.

Strategy 1: append

append — immutable event streamsincremental
{{ config(
  materialized  = 'incremental',
  incremental_strategy = 'append'   -- INSERT only, never touch existing rows
) }}

-- Perfect for: event logs, audit trails, append-only Kafka topics.
-- Danger: if your source replays events, you'll get duplicates.
-- Guard with: unique_key at the warehouse level or a dedupe downstream.

select event_id, user_id, event_ts, payload
from {{ source('kafka', 'raw_events') }}
{% if is_incremental() %}
  where event_ts > (select max(event_ts) from {{ this }})
{% endif %}

Strategy 2: delete+insert (Snowflake default)

delete+insert — partition replacementincremental
{{ config(
  materialized         = 'incremental',
  incremental_strategy = 'delete+insert',
  unique_key           = 'order_id',        -- DELETE matching rows first
  on_schema_change     = 'append_new_columns'
) }}

-- dbt executes:
--   DELETE FROM target WHERE order_id IN (SELECT order_id FROM new_data)
--   INSERT INTO target SELECT * FROM new_data
--
-- Great for: mutable source records (orders that update status)
-- Cheaper than MERGE for most warehouses — no row-by-row comparison.

select
  order_id,
  status,
  updated_at,
  amount_dollars
from {{ source('postgres', 'orders') }}
{% if is_incremental() %}
  where updated_at > (select max(updated_at) from {{ this }})
{% endif %}

Strategy 3: merge (the full UPSERT)

merge — fine-grained controlincremental
{{ config(
  materialized         = 'incremental',
  incremental_strategy = 'merge',
  unique_key           = ['order_id', 'order_date'],  -- composite key
  merge_exclude_columns= ['created_at'],              -- never overwrite these
  merge_update_columns = ['status', 'updated_at']   -- only update these
) }}

-- MERGE INTO target t
-- USING new_data s ON t.order_id = s.order_id AND t.order_date = s.order_date
-- WHEN MATCHED THEN UPDATE SET t.status = s.status, t.updated_at = s.updated_at
-- WHEN NOT MATCHED THEN INSERT (...)
--
-- merge_update_columns is your surgical scalpel.
-- Prevents accidentally overwriting audit fields.

Strategy 4: microbatch (dbt 1.9+) — The Kafka-Native Pattern

microbatch — event_time partitioned processingincremental
{{ config(
  materialized         = 'incremental',
  incremental_strategy = 'microbatch',
  event_time           = 'event_ts',       -- partition axis
  begin                = '2024-01-01',    -- backfill start
  batch_size           = 'day',            -- hour | day | month
  lookback             = 3                 -- reprocess last 3 batches
) }}

-- dbt injects {{ start_ts }} and {{ end_ts }} automatically per batch.
-- Perfect for: all-vector ingress (Kafka, S3, API) where data arrives late.
-- lookback=3 handles the "data arrives 2 days late" problem elegantly.
-- Run in parallel: dbt run --threads 8 processes 8 daily batches simultaneously.

select
  event_id,
  event_ts,
  event_type,
  user_id,
  json_extract_scalar(payload, '$.session_id') as session_id
from {{ source('kafka', 'events') }}
-- microbatch auto-injects the WHERE — you don't write it:
-- WHERE event_ts >= '2024-01-15 00:00:00' AND event_ts < '2024-01-16 00:00:00'
LATE-ARRIVING DATA — THE INGRESS ENGINEER'S NIGHTMARE Microbatch's lookback parameter is how dbt handles late data. Set lookback=3 on a daily batch = dbt re-processes the last 3 days on every run. Your Kafka consumer was 47 hours behind? lookback=3 catches it automatically. Combine with unique_key to avoid duplicates.

Late Data + Schema Evolution Patterns

on_schema_change — handling source driftconfig options
-- on_schema_change options — choose your poison:

on_schema_change = 'ignore'
-- Default. New columns in source? Silently ignored.
-- Safest. Your downstream won't break. But you miss new data.

on_schema_change = 'fail'
-- Raises an error if source schema changes. Forces you to deal with it.
-- Good in production where silent data loss is worse than a failed run.

on_schema_change = 'append_new_columns'
-- Adds new columns to the target; fills existing rows with NULL.
-- Good for: append-only schema evolution (new event properties).

on_schema_change = 'sync_all_columns'
-- Adds new, drops removed columns. Full sync with source.
-- WARNING: drops columns = data loss. Only use if you OWN the source schema.
Testing Arsenal Data Contracts in Code
dbt tests are assertions. They run SQL against your data. Fail = pipeline should stop. Your SQL background makes you dangerous here.

The Four Built-in Generic Tests

models/staging/schema.ymlschema tests
models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique                # SELECT count(*) WHERE count > 1
          - not_null             # SELECT count(*) WHERE col IS NULL

      - name: status
        tests:
          - accepted_values:     # SELECT ... WHERE NOT IN (values)
              values: [PENDING, COMPLETED, CANCELLED, REFUNDED]
              quote: false

      - name: customer_id
        tests:
          - relationships:       # referential integrity
              to: ref('stg_customers')
              field: customer_id

      - name: amount_dollars
        tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 100000
              strictly: true

Custom Singular Tests — Pure SQL

tests/assert_revenue_positive.sqlsingular test
-- ANY ROW RETURNED = TEST FAILS.
-- dbt expects zero rows from a passing test.
-- This is pure SQL — use everything you know.

with revenue_check as (

  select
    order_date,
    sum(net_revenue)  as daily_revenue,
    count(*)          as order_count

  from {{ ref('mart_revenue') }}
  group by 1

)

-- Return rows where revenue is IMPOSSIBLE
select *
from revenue_check
where
  daily_revenue < 0                    -- negative revenue
  or order_count > 1000000            -- astronomical count
  or daily_revenue / order_count > 50000 -- avg order > $50k

Custom Generic Tests (Reusable Assertions)

macros/tests/test_not_future_date.sqlgeneric test
{% test not_future_date(model, column_name, tolerance_days=1) %}

  -- Reusable: call from schema.yml on any date column.
  -- tolerance_days handles timezone skew.

  select
    {{ column_name }}  as bad_date,
    current_timestamp() as checked_at
  from {{ model }}
  where
    {{ column_name }} >
    current_timestamp() + interval '{{ tolerance_days }} days'

{% endtest %}

-- Usage in schema.yml:
-- tests:
--   - not_future_date:
--       column_name: created_at
--       tolerance_days: 2

Test Severity — Warn vs Error

severity + store_failures = production-grade testingadvanced config
models:
  - name: mart_revenue
    tests:
      ## Model-level test — freshness check
      - dbt_utils.recency:
          datepart: hour
          field: order_date
          interval: 25
          severity: warn        # warn doesn't fail the pipeline
          config:
            store_failures: true # persist failing rows to a table
            where: "order_date > current_date - 7"

    columns:
      - name: gross_revenue
        tests:
          - not_null:
              severity: error    # hard fail — halt the pipeline
          - dbt_expectations.expect_column_sum_to_be_between:
              min_value: 1000
              max_value: 10000000
              severity: warn     # alerts team, doesn't block

-- store_failures=true creates:  [schema].[test_mart_revenue_recency_...]
-- Query it to debug: SELECT * FROM dbt_test__audit.mart_revenue_recency_order_date_25
Snapshots Automated SCD Type 2
Point dbt at a mutable source. It handles valid_from, valid_to, dbt_scd_id, and the entire historical record. The MERGE statement you've been writing for years — automated.

Timestamp Strategy

snapshots/customers_snapshot.sqlsnapshot
{% snapshot customers_snapshot %}

{{
  config(
    target_schema  = 'snapshots',
    unique_key     = 'customer_id',
    strategy       = 'timestamp',
    updated_at     = 'updated_at'   -- change detector
  )
}}

select *
from {{ source('postgres', 'customers') }}

{% endsnapshot %}

-- dbt adds these columns automatically:
-- dbt_scd_id     UUID for each version row
-- dbt_updated_at timestamp this version was captured
-- dbt_valid_from when this version became active
-- dbt_valid_to   NULL = current version, else end timestamp

Check Strategy (no updated_at column?)

snapshots/products_snapshot.sqlsnapshot
{% snapshot products_snapshot %}

{{
  config(
    target_schema = 'snapshots',
    unique_key    = 'product_id',
    strategy      = 'check',
    check_cols    = ['price', 'category', 'is_active']
    -- dbt hashes these cols and detects any change
    -- check_cols = 'all' hashes every column
  )
}}

select *
from {{ source('postgres', 'products') }}

{% endsnapshot %}

Querying Snapshot History — Point-in-Time Lookups

models/intermediate/int_customer_at_order_time.sqlSCD2 join
-- The classic SCD2 join — what was the customer's plan when they placed the order?
-- This is where snapshot data pays dividends for historical analysis.

with orders as (
  select * from {{ ref('stg_orders') }}
),

customers_history as (
  select * from {{ ref('customers_snapshot') }}
)

select
  o.order_id,
  o.created_at                    as order_date,
  o.amount_dollars,
  c.customer_id,
  c.subscription_plan             as plan_at_order_time,
  c.country                       as country_at_order_time,
  c.dbt_valid_from,
  c.dbt_valid_to

from orders o
left join customers_history c
  on  o.customer_id  = c.customer_id
  and o.created_at  >= c.dbt_valid_from
  and (
    o.created_at < c.dbt_valid_to
    or c.dbt_valid_to is null  -- current version
  )
Python Models dbt 1.3+
When SQL isn't enough: ML inference, complex transformations, pandas operations, Spark. Python models run on Snowpark, BigQuery Dataproc, or Databricks.
INGRESS RELEVANCE Python models unlock: Snowpark ML for anomaly detection on event streams, pandas for complex JSON unnesting, PySpark for Databricks-native transformations, and HTTP calls to external APIs — all within the dbt DAG.

The model() Function Signature

models/ml/predict_churn.pypython model
# Python models must define a model(dbt, session) function.
# dbt.ref() and dbt.source() return DataFrames.

import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder

def model(dbt, session):

    # Config lives in the function body
    dbt.config(
        materialized='table',
        packages=["scikit-learn", "pandas"]
    )

    # ref() returns a Snowpark / Spark DataFrame
    features_df = dbt.ref('int_churn_features')
    labels_df   = dbt.ref('mart_churn_labels')

    # Convert to pandas (Snowpark / BQ Dataproc)
    features = features_df.to_pandas()
    labels   = labels_df.to_pandas()

    X_train = features[['days_since_login', 'order_count_30d',
                         'support_tickets', 'plan_tier']]
    y_train = labels['churned']

    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    clf.fit(X_train, y_train)

    # Score production data
    prod_df = dbt.ref('int_current_customers').to_pandas()
    prod_df['churn_probability'] = clf.predict_proba(
        prod_df[['days_since_login', 'order_count_30d',
                 'support_tickets', 'plan_tier']]
    )[:, 1]

    # Return a DataFrame — dbt persists it as a table
    return prod_df[['customer_id', 'churn_probability']]

Spark (Databricks) — Native DataFrame API

models/ingress/parse_kafka_avro.pypython + spark
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType

def model(dbt, spark):

    dbt.config(
        materialized='incremental',
        incremental_strategy='merge',
        unique_key='event_id',
        file_format='delta'         # Delta Lake!
    )

    # dbt.ref() returns a Spark DataFrame
    raw = dbt.source('kafka', 'raw_avro_events')

    parsed = (
        raw
        .withColumn('payload', F.from_json(
            F.col('value'),
            'struct<event_id:string, user_id:string, ts:long>'
        ))
        .select(
            F.col('payload.event_id').alias('event_id'),
            F.col('payload.user_id').alias('user_id'),
            F.to_timestamp(F.col('payload.ts') / 1000).alias('event_ts'),
            F.col('partition').alias('kafka_partition'),
            F.col('offset').alias('kafka_offset')
        )
        .filter(F.col('event_id').isNotNull())
    )

    if dbt.is_incremental:
        max_ts = spark.sql(f"SELECT max(event_ts) FROM {dbt.this}")
        cutoff = max_ts.collect()[0][0]
        parsed = parsed.filter(F.col('event_ts') > cutoff)

    return parsed
DELTA LAKE + dbt = LAKEHOUSE INGRESS file_format='delta' turns your Python model into a Delta Lake table. Combine with Databricks Auto Loader as a source and you have a full streaming ingress pipeline fully managed in dbt's DAG. No Airflow required.
Advanced Patterns Senior Engineer Moves
The things that separate a dbt project that scales from one that becomes a liability. Custom schema naming, hooks, meta-driven models, and packages.

generate_schema_name — Multi-Env Schema Control

macros/get_custom_schema.sqlschema override
{% macro generate_schema_name(custom_schema_name, node) %}

  {% set default_schema = target.schema %}

  {% if target.name == 'prod' %}

    -- In prod: use EXACTLY the custom_schema_name you set.
    -- stg_orders → analytics.staging (not analytics_staging)
    {% if custom_schema_name is not none %}
      {{ custom_schema_name | trim }}
    {% else %}
      {{ default_schema | trim }}
    {% endif %}

  {% else %}

    -- In dev: prefix with user target schema.
    -- avoids dev engineers stomping on each other.
    {% if custom_schema_name is not none %}
      {{ default_schema }}_{{ custom_schema_name | trim }}
    {% else %}
      {{ default_schema | trim }}
    {% endif %}

  {% endif %}

{% endmacro %}

Pre/Post Hooks — DDL Around Your Models

models/marts/mart_revenue.sql (with hooks)hooks
{{
  config(
    materialized = 'table',
    pre_hook     = [
      "CALL analytics.lock_table('{{ this }}')",
      "{{ log('Starting mart_revenue build', info=True) }}"
    ],
    post_hook    = [
      -- Grant access to BI roles after build
      "GRANT SELECT ON {{ this }} TO ROLE REPORTER",
      -- Analyze table for query planner
      "ANALYZE {{ this }}",
      -- Custom audit log
      """
        INSERT INTO analytics.build_audit
        SELECT '{{ this }}', current_timestamp, '{{ invocation_id }}'
      """
    ]
  )
}}

select * from {{ ref('int_orders_enriched') }}

Meta-Driven Models — The Configuration-as-Data Pattern

models/intermediate/int_multi_source_unioned.sqlmeta-driven
-- Define sources in YAML, generate the UNION ALL in Jinja.
-- Add a new source: edit YAML, zero SQL changes.

{% set event_sources = [
  { 'name': 'kafka_events',    'source': ('kafka',   'events'),   'platform': 'mobile' },
  { 'name': 'web_events',      'source': ('segment', 'web'),      'platform': 'web'    },
  { 'name': 'backend_events',  'source': ('postgres','api_logs'), 'platform': 'api'    }
] %}

{% for src in event_sources %}

  select
    event_id,
    user_id,
    event_ts,
    event_type,
    '{{ src.platform }}'  as platform
  from {{ source(src.source[0], src.source[1]) }}

  {% if not loop.last %}
  union all
  {% endif %}

{% endfor %}

dbt_utils Package — The Standard Library

packages.yml + usage examplespackages
# packages.yml
packages:
  - package: dbt-labs/dbt_utils
    version: [">=1.1.0", "<2.0.0"]
  - package: calogica/dbt_expectations
    version: [">=0.10.0"]
  - package: dbt-labs/audit_helper
    version: [">=0.9.0"]

-- ── dbt_utils HITS ──────────────────────────────────

-- 1. Star (exclude audit columns from SELECT *):
{{ dbt_utils.star(
  ref('stg_orders'),
  except=['_fivetran_deleted', '_fivetran_synced']
) }}

-- 2. Generate surrogate key:
{{ dbt_utils.generate_surrogate_key(['order_id', 'product_id']) }} as order_line_key

-- 3. Date spine (fill gaps in time series):
{{ dbt_utils.date_spine(
  datepart='day',
  start_date="'2024-01-01'",
  end_date="current_date"
) }}

-- 4. Pivot:
{{ dbt_utils.pivot(
  column='status',
  values=['PENDING', 'COMPLETED', 'CANCELLED'],
  agg='count',
  then_value='order_id'
) }}

-- 5. Union relations (same schema, different partitions):
{{ dbt_utils.union_relations(
  relations=[
    ref('stg_events_2023'),
    ref('stg_events_2024')
  ]
) }}

Selectors — Surgical DAG Execution

CLI — dbt's graph selector syntaxbash
# Run a single model
dbt run --select stg_orders

# Run + all upstream dependencies
dbt run --select +stg_orders

# Run + all downstream dependents
dbt run --select stg_orders+

# Run everything 2 hops upstream
dbt run --select 2+stg_orders

# Run all models tagged 'ingress'
dbt run --select tag:ingress

# Run all models in a directory
dbt run --select models/staging

# Run modified + their downstream (git diff)
dbt run --select state:modified+

# Test ONLY the marts layer
dbt test --select models/marts

# Exclude snapshots from a run
dbt run --exclude resource_type:snapshot

# Run incrementals with a full refresh (rebuild from scratch)
dbt run --full-refresh --select tag:incremental

# Microbatch: reprocess a specific date range
dbt run --select stg_events \
  --event-time-start "2024-01-01" \
  --event-time-end   "2024-01-15"