materialized='view'materialized='table'materialized='incremental'snapshot block{{ ref('model_name') }}{{ source('schema','table') }}macros/seeds/dbt test schema testsRaw ingested data. You declare these in schema.yml files, not create them. Think: Fivetran landing zone, Airbyte raw tables, Kafka-to-S3 dumps.
SQL (or Python) SELECT statements. Each file = one relation in your warehouse. dbt resolves dependencies automatically via ref().
CSV files checked into git. dbt seed loads them. Perfect for country codes, category maps, static cost tables.
Jinja functions that generate SQL. Reusable logic. Cross-project via packages. Think SQL templating on steroids.
Type-2 SCDs, automated. Point at a source, define uniqueness + change detection strategy, dbt handles the valid_from / valid_to bookkeeping.
Assertions on your data. Four built-in generic tests. Unlimited custom SQL tests. Package extensions like dbt_expectations give you 50+ more.
Each arrow is a ref() or source() call. dbt resolves execution order and builds the DAG automatically.
{{ config( materialized = 'view', schema = 'staging', tags = ['staging', 'orders'] ) }} with source as ( select * from {{ source('postgres', 'orders') }} ), renamed as ( select -- rename & cast at the boundary order_id as order_id, customer_id as customer_id, cast(created_at as timestamp) as created_at, upper(status) as status, amount / 100.0 as amount_dollars, _fivetran_synced as _loaded_at from source ) select * from renamed
sources: - name: postgres database: raw schema: public loaded_at_field: _fivetran_synced freshness: warn_after: {count: 1, period: hour} error_after: {count: 6, period: hour} tables: - name: orders description: "Raw orders from Postgres via Fivetran" columns: - name: order_id description: "PK — UUID" tests: - not_null - unique
Run dbt source freshness to alert when sources go stale — critical for ingress monitoring.
{{ config( materialized = 'table', schema = 'marts', dist = 'customer_id', -- Redshift sort = ['order_date'], -- Redshift -- BigQuery equivalent: partition_by = { 'field': 'order_date', 'data_type': 'date', 'granularity': 'day' }, cluster_by = ['customer_id', 'status'] ) }} with orders as ( select * from {{ ref('int_orders_enriched') }} ), daily_revenue as ( select date_trunc('day', created_at) as order_date, customer_id, status, count(*) as order_count, sum(amount_dollars) as gross_revenue, sum(amount_dollars) filter (where status = 'COMPLETED') as net_revenue from orders group by 1, 2, 3 ) select * from daily_revenue
partition_by and cluster_by directly to BigQuery DDL. On Redshift, dist and sort control physical layout. Know your warehouse, inject it here.
| Factor | → View | → Table |
|---|---|---|
| Query frequency | Low / ad-hoc | High / BI tool |
| Transform cost | Cheap scans | Expensive aggregation |
| Data freshness | Always live | As-of last dbt run |
| Storage cost | Zero | Full duplication |
| Downstream joins | Re-compute each time | Indexes / partitions help |
--full-refresh escape hatch.
{{ config( materialized = 'incremental', unique_key = 'event_id', on_schema_change = 'sync_all_columns' ) }} select event_id, user_id, event_type, event_ts, properties from {{ source('kafka', 'events') }} {% if is_incremental() %} where event_ts > (select max(event_ts) from {{ this }}) {% endif %}
{{ this }} is the relation reference to the model's own table in the warehouse — the magic that makes incremental work. is_incremental() returns false on first run (full load) and true on subsequent runs.
{{ config(materialized='ephemeral') }} -- This file creates NO table/view in the warehouse. -- It's inlined as a CTE wherever ref() is called. select currency_code, rate_to_usd, date_trunc('day', valid_from) as rate_date from {{ ref('stg_exchange_rates') }} where is_current = true
-- dbt inlines the ephemeral as a CTE: with __dbt__cte__int_exchange_rates as ( select currency_code, rate_to_usd, date_trunc('day', valid_from) as rate_date from raw.public.exchange_rates where is_current = true ), your_model_cte as ( select o.amount_dollars * fx.rate_to_usd as amount_usd from orders o join __dbt__cte__int_exchange_rates fx using (currency_code) ) select * from your_model_cte
ref()'d from other ephemeral models more than ~3 levels deep before the CTE stack explodes. Use a view instead when the chain gets long.
{{ config( materialized = 'materialized_view', -- Snowflake: target_lag controls refresh frequency target_lag = '1 minute', snowflake_warehouse = 'TRANSFORMING' ) }} -- The warehouse keeps this fresh automatically. -- dbt run detects drift and issues ALTER/REPLACE DDL. select session_id, user_id, max(event_ts) as last_activity, count(*) as event_count from {{ ref('stg_events') }} where event_ts > current_timestamp() - interval '24 hours' group by 1, 2
{% macro cents_to_dollars(column_name, scale=2) %} round({{ column_name }} / 100.0, {{ scale }}) {% endmacro %} -- Usage in any model: select order_id, {{ cents_to_dollars('amount_cents') }} as amount, {{ cents_to_dollars('tax_cents', scale=4) }} as tax -- Compiles to: select order_id, round(amount_cents / 100.0, 2) as amount, round(tax_cents / 100.0, 4) as tax
{% macro pivot_status_counts(statuses) %} {% for status in statuses %} countif(status = '{{ status }}') as {{ status | lower }}_count {% if not loop.last %},{% endif %} {% endfor %} {% endmacro %} -- Usage: select customer_id, {{ pivot_status_counts([ 'PENDING', 'COMPLETED', 'CANCELLED', 'REFUNDED' ]) }} from {{ ref('stg_orders') }} group by 1
{% macro get_column_names(model) %} {% set relation = ref(model) %} {% set columns = adapter.get_columns_in_relation(relation) %} {{ log("Columns in " ~ model ~ ":", info=true) }} {% for col in columns %} {{ log(col.name ~ " (" ~ col.dtype ~ ")", info=true) }} {% endfor %} {% endmacro %} -- A model that auto-selects all non-PII columns: {% macro select_non_pii(model, pii_columns) %} {% set relation = ref(model) %} {% set all_cols = adapter.get_columns_in_relation(relation) %} select {% for col in all_cols %} {% if col.name not in pii_columns %} {{ col.name }}{% if not loop.last %},{% endif %} {% endif %} {% endfor %} from {{ relation }} {% endmacro %}
adapter.get_columns_in_relation(), adapter.get_relation(), adapter.already_exists() — these are warehouse-introspection calls inside Jinja. Most senior dbt engineers never use them. You should.
{% macro get_max_date(model, column) %} {% set query %} select max({{ column }}) as max_date from {{ ref(model) }} {% endset %} {% set result = run_query(query) %} {% if execute %} {% set max_date = result.columns[0].values()[0] %} {{ return(max_date) }} {% endif %} {% endmacro %} -- Usage: dynamically backfill from last loaded date {% set cutoff = get_max_date('stg_orders', 'created_at') %}
-- Base macro — defines the interface {% macro safe_divide(numerator, denominator) %} {{ return(adapter.dispatch('safe_divide')(numerator, denominator)) }} {% endmacro %} -- BigQuery implementation {% macro bigquery__safe_divide(numerator, denominator) %} safe_divide({{ numerator }}, {{ denominator }}) {% endmacro %} -- Snowflake / Postgres fallback {% macro default__safe_divide(numerator, denominator) %} case when {{ denominator }} = 0 then null else {{ numerator }} / nullif({{ denominator }}, 0) end {% endmacro %} -- Usage in any model — dbt picks the right implementation: select {{ safe_divide('net_revenue', 'order_count') }} as avg_order_value
{{ config( materialized = 'incremental', incremental_strategy = 'append' -- INSERT only, never touch existing rows ) }} -- Perfect for: event logs, audit trails, append-only Kafka topics. -- Danger: if your source replays events, you'll get duplicates. -- Guard with: unique_key at the warehouse level or a dedupe downstream. select event_id, user_id, event_ts, payload from {{ source('kafka', 'raw_events') }} {% if is_incremental() %} where event_ts > (select max(event_ts) from {{ this }}) {% endif %}
{{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = 'order_id', -- DELETE matching rows first on_schema_change = 'append_new_columns' ) }} -- dbt executes: -- DELETE FROM target WHERE order_id IN (SELECT order_id FROM new_data) -- INSERT INTO target SELECT * FROM new_data -- -- Great for: mutable source records (orders that update status) -- Cheaper than MERGE for most warehouses — no row-by-row comparison. select order_id, status, updated_at, amount_dollars from {{ source('postgres', 'orders') }} {% if is_incremental() %} where updated_at > (select max(updated_at) from {{ this }}) {% endif %}
{{ config( materialized = 'incremental', incremental_strategy = 'merge', unique_key = ['order_id', 'order_date'], -- composite key merge_exclude_columns= ['created_at'], -- never overwrite these merge_update_columns = ['status', 'updated_at'] -- only update these ) }} -- MERGE INTO target t -- USING new_data s ON t.order_id = s.order_id AND t.order_date = s.order_date -- WHEN MATCHED THEN UPDATE SET t.status = s.status, t.updated_at = s.updated_at -- WHEN NOT MATCHED THEN INSERT (...) -- -- merge_update_columns is your surgical scalpel. -- Prevents accidentally overwriting audit fields.
{{ config( materialized = 'incremental', incremental_strategy = 'microbatch', event_time = 'event_ts', -- partition axis begin = '2024-01-01', -- backfill start batch_size = 'day', -- hour | day | month lookback = 3 -- reprocess last 3 batches ) }} -- dbt injects {{ start_ts }} and {{ end_ts }} automatically per batch. -- Perfect for: all-vector ingress (Kafka, S3, API) where data arrives late. -- lookback=3 handles the "data arrives 2 days late" problem elegantly. -- Run in parallel: dbt run --threads 8 processes 8 daily batches simultaneously. select event_id, event_ts, event_type, user_id, json_extract_scalar(payload, '$.session_id') as session_id from {{ source('kafka', 'events') }} -- microbatch auto-injects the WHERE — you don't write it: -- WHERE event_ts >= '2024-01-15 00:00:00' AND event_ts < '2024-01-16 00:00:00'
lookback parameter is how dbt handles late data. Set lookback=3 on a daily batch = dbt re-processes the last 3 days on every run. Your Kafka consumer was 47 hours behind? lookback=3 catches it automatically. Combine with unique_key to avoid duplicates.
-- on_schema_change options — choose your poison: on_schema_change = 'ignore' -- Default. New columns in source? Silently ignored. -- Safest. Your downstream won't break. But you miss new data. on_schema_change = 'fail' -- Raises an error if source schema changes. Forces you to deal with it. -- Good in production where silent data loss is worse than a failed run. on_schema_change = 'append_new_columns' -- Adds new columns to the target; fills existing rows with NULL. -- Good for: append-only schema evolution (new event properties). on_schema_change = 'sync_all_columns' -- Adds new, drops removed columns. Full sync with source. -- WARNING: drops columns = data loss. Only use if you OWN the source schema.
models: - name: stg_orders columns: - name: order_id tests: - unique # SELECT count(*) WHERE count > 1 - not_null # SELECT count(*) WHERE col IS NULL - name: status tests: - accepted_values: # SELECT ... WHERE NOT IN (values) values: [PENDING, COMPLETED, CANCELLED, REFUNDED] quote: false - name: customer_id tests: - relationships: # referential integrity to: ref('stg_customers') field: customer_id - name: amount_dollars tests: - dbt_expectations.expect_column_values_to_be_between: min_value: 0 max_value: 100000 strictly: true
-- ANY ROW RETURNED = TEST FAILS. -- dbt expects zero rows from a passing test. -- This is pure SQL — use everything you know. with revenue_check as ( select order_date, sum(net_revenue) as daily_revenue, count(*) as order_count from {{ ref('mart_revenue') }} group by 1 ) -- Return rows where revenue is IMPOSSIBLE select * from revenue_check where daily_revenue < 0 -- negative revenue or order_count > 1000000 -- astronomical count or daily_revenue / order_count > 50000 -- avg order > $50k
{% test not_future_date(model, column_name, tolerance_days=1) %} -- Reusable: call from schema.yml on any date column. -- tolerance_days handles timezone skew. select {{ column_name }} as bad_date, current_timestamp() as checked_at from {{ model }} where {{ column_name }} > current_timestamp() + interval '{{ tolerance_days }} days' {% endtest %} -- Usage in schema.yml: -- tests: -- - not_future_date: -- column_name: created_at -- tolerance_days: 2
models: - name: mart_revenue tests: ## Model-level test — freshness check - dbt_utils.recency: datepart: hour field: order_date interval: 25 severity: warn # warn doesn't fail the pipeline config: store_failures: true # persist failing rows to a table where: "order_date > current_date - 7" columns: - name: gross_revenue tests: - not_null: severity: error # hard fail — halt the pipeline - dbt_expectations.expect_column_sum_to_be_between: min_value: 1000 max_value: 10000000 severity: warn # alerts team, doesn't block -- store_failures=true creates: [schema].[test_mart_revenue_recency_...] -- Query it to debug: SELECT * FROM dbt_test__audit.mart_revenue_recency_order_date_25
valid_from, valid_to, dbt_scd_id, and the entire historical record. The MERGE statement you've been writing for years — automated.{% snapshot customers_snapshot %} {{ config( target_schema = 'snapshots', unique_key = 'customer_id', strategy = 'timestamp', updated_at = 'updated_at' -- change detector ) }} select * from {{ source('postgres', 'customers') }} {% endsnapshot %} -- dbt adds these columns automatically: -- dbt_scd_id UUID for each version row -- dbt_updated_at timestamp this version was captured -- dbt_valid_from when this version became active -- dbt_valid_to NULL = current version, else end timestamp
{% snapshot products_snapshot %} {{ config( target_schema = 'snapshots', unique_key = 'product_id', strategy = 'check', check_cols = ['price', 'category', 'is_active'] -- dbt hashes these cols and detects any change -- check_cols = 'all' hashes every column ) }} select * from {{ source('postgres', 'products') }} {% endsnapshot %}
-- The classic SCD2 join — what was the customer's plan when they placed the order? -- This is where snapshot data pays dividends for historical analysis. with orders as ( select * from {{ ref('stg_orders') }} ), customers_history as ( select * from {{ ref('customers_snapshot') }} ) select o.order_id, o.created_at as order_date, o.amount_dollars, c.customer_id, c.subscription_plan as plan_at_order_time, c.country as country_at_order_time, c.dbt_valid_from, c.dbt_valid_to from orders o left join customers_history c on o.customer_id = c.customer_id and o.created_at >= c.dbt_valid_from and ( o.created_at < c.dbt_valid_to or c.dbt_valid_to is null -- current version )
# Python models must define a model(dbt, session) function. # dbt.ref() and dbt.source() return DataFrames. import pandas as pd from sklearn.ensemble import RandomForestClassifier from sklearn.preprocessing import LabelEncoder def model(dbt, session): # Config lives in the function body dbt.config( materialized='table', packages=["scikit-learn", "pandas"] ) # ref() returns a Snowpark / Spark DataFrame features_df = dbt.ref('int_churn_features') labels_df = dbt.ref('mart_churn_labels') # Convert to pandas (Snowpark / BQ Dataproc) features = features_df.to_pandas() labels = labels_df.to_pandas() X_train = features[['days_since_login', 'order_count_30d', 'support_tickets', 'plan_tier']] y_train = labels['churned'] clf = RandomForestClassifier(n_estimators=100, random_state=42) clf.fit(X_train, y_train) # Score production data prod_df = dbt.ref('int_current_customers').to_pandas() prod_df['churn_probability'] = clf.predict_proba( prod_df[['days_since_login', 'order_count_30d', 'support_tickets', 'plan_tier']] )[:, 1] # Return a DataFrame — dbt persists it as a table return prod_df[['customer_id', 'churn_probability']]
from pyspark.sql import functions as F from pyspark.sql.types import StructType, StringType def model(dbt, spark): dbt.config( materialized='incremental', incremental_strategy='merge', unique_key='event_id', file_format='delta' # Delta Lake! ) # dbt.ref() returns a Spark DataFrame raw = dbt.source('kafka', 'raw_avro_events') parsed = ( raw .withColumn('payload', F.from_json( F.col('value'), 'struct<event_id:string, user_id:string, ts:long>' )) .select( F.col('payload.event_id').alias('event_id'), F.col('payload.user_id').alias('user_id'), F.to_timestamp(F.col('payload.ts') / 1000).alias('event_ts'), F.col('partition').alias('kafka_partition'), F.col('offset').alias('kafka_offset') ) .filter(F.col('event_id').isNotNull()) ) if dbt.is_incremental: max_ts = spark.sql(f"SELECT max(event_ts) FROM {dbt.this}") cutoff = max_ts.collect()[0][0] parsed = parsed.filter(F.col('event_ts') > cutoff) return parsed
file_format='delta' turns your Python model into a Delta Lake table. Combine with Databricks Auto Loader as a source and you have a full streaming ingress pipeline fully managed in dbt's DAG. No Airflow required.
{% macro generate_schema_name(custom_schema_name, node) %} {% set default_schema = target.schema %} {% if target.name == 'prod' %} -- In prod: use EXACTLY the custom_schema_name you set. -- stg_orders → analytics.staging (not analytics_staging) {% if custom_schema_name is not none %} {{ custom_schema_name | trim }} {% else %} {{ default_schema | trim }} {% endif %} {% else %} -- In dev: prefix with user target schema. -- avoids dev engineers stomping on each other. {% if custom_schema_name is not none %} {{ default_schema }}_{{ custom_schema_name | trim }} {% else %} {{ default_schema | trim }} {% endif %} {% endif %} {% endmacro %}
{{ config( materialized = 'table', pre_hook = [ "CALL analytics.lock_table('{{ this }}')", "{{ log('Starting mart_revenue build', info=True) }}" ], post_hook = [ -- Grant access to BI roles after build "GRANT SELECT ON {{ this }} TO ROLE REPORTER", -- Analyze table for query planner "ANALYZE {{ this }}", -- Custom audit log """ INSERT INTO analytics.build_audit SELECT '{{ this }}', current_timestamp, '{{ invocation_id }}' """ ] ) }} select * from {{ ref('int_orders_enriched') }}
-- Define sources in YAML, generate the UNION ALL in Jinja. -- Add a new source: edit YAML, zero SQL changes. {% set event_sources = [ { 'name': 'kafka_events', 'source': ('kafka', 'events'), 'platform': 'mobile' }, { 'name': 'web_events', 'source': ('segment', 'web'), 'platform': 'web' }, { 'name': 'backend_events', 'source': ('postgres','api_logs'), 'platform': 'api' } ] %} {% for src in event_sources %} select event_id, user_id, event_ts, event_type, '{{ src.platform }}' as platform from {{ source(src.source[0], src.source[1]) }} {% if not loop.last %} union all {% endif %} {% endfor %}
# packages.yml packages: - package: dbt-labs/dbt_utils version: [">=1.1.0", "<2.0.0"] - package: calogica/dbt_expectations version: [">=0.10.0"] - package: dbt-labs/audit_helper version: [">=0.9.0"] -- ── dbt_utils HITS ────────────────────────────────── -- 1. Star (exclude audit columns from SELECT *): {{ dbt_utils.star( ref('stg_orders'), except=['_fivetran_deleted', '_fivetran_synced'] ) }} -- 2. Generate surrogate key: {{ dbt_utils.generate_surrogate_key(['order_id', 'product_id']) }} as order_line_key -- 3. Date spine (fill gaps in time series): {{ dbt_utils.date_spine( datepart='day', start_date="'2024-01-01'", end_date="current_date" ) }} -- 4. Pivot: {{ dbt_utils.pivot( column='status', values=['PENDING', 'COMPLETED', 'CANCELLED'], agg='count', then_value='order_id' ) }} -- 5. Union relations (same schema, different partitions): {{ dbt_utils.union_relations( relations=[ ref('stg_events_2023'), ref('stg_events_2024') ] ) }}
# Run a single model dbt run --select stg_orders # Run + all upstream dependencies dbt run --select +stg_orders # Run + all downstream dependents dbt run --select stg_orders+ # Run everything 2 hops upstream dbt run --select 2+stg_orders # Run all models tagged 'ingress' dbt run --select tag:ingress # Run all models in a directory dbt run --select models/staging # Run modified + their downstream (git diff) dbt run --select state:modified+ # Test ONLY the marts layer dbt test --select models/marts # Exclude snapshots from a run dbt run --exclude resource_type:snapshot # Run incrementals with a full refresh (rebuild from scratch) dbt run --full-refresh --select tag:incremental # Microbatch: reprocess a specific date range dbt run --select stg_events \ --event-time-start "2024-01-01" \ --event-time-end "2024-01-15"