| Old World | DuckDB World |
|---|---|
| Launch a server | Import a library |
| Connection pool | In-process call |
| Row-store pages | Column groups + zone maps |
| Interpret row by row | Vectorized 1024-row batches |
| Single-threaded scan | Morsel-driven parallelism |
| JDBC/ODBC latency | Zero-copy Arrow handoff |
| ETL into warehouse | Query in place |
| Schema-bound tables | Schema-on-read anything |
| Fixed compute cluster | Laptop = analytics engine |
Every operator processes 1,024-row chunks (vectors). This isn't just cache-friendly — it enables SIMD: the CPU executes the same operation across 8–16 values simultaneously using AVX-512 instructions. A filter on a 100M-row Parquet file doesn't touch 99.9M rows that fail zone map checks.
WHERE event_ts > '2024-01-01' down to the scan: row groups outside the range are never read. No full table scans for selective queries.
-- Install: pip install duckdb (that's it. No server. No config.) -- Query a 2GB Parquet file from S3 directly — no download: SELECT user_id, count(*) events, sum(revenue) total FROM read_parquet('s3://my-data-lake/events/2024/**/*.parquet') WHERE event_type = 'purchase' GROUP BY ALL ORDER BY total DESC LIMIT 20; -- GROUP BY ALL — auto-infers non-aggregated columns. Zero maintenance. -- ORDER BY column_alias — yes, you can order by an alias you defined above. -- Scan 47 CSV files with schema inference and hive partitioning: SELECT * FROM read_csv('data/year=*/month=*/*.csv', hive_partitioning = true, -- year, month become columns auto_detect = true, -- infer types, delimiters parallel = true -- all cores ); -- Query a pandas DataFrame (zero copy via Arrow): SELECT * FROM df WHERE amount > 100; -- `df` is a Python variable! -- Query a live Postgres database as if it's a local table: ATTACH 'postgresql://user:pass@host/db' AS pg (TYPE postgres, READ_ONLY); SELECT * FROM pg.public.orders LIMIT 100;
-- Single file FROM read_parquet('data/events.parquet'); -- Glob — all files in dir, recursive FROM read_parquet('data/**/*.parquet'); -- List of files (heterogeneous paths) FROM read_parquet([ 'data/jan.parquet', 's3://bucket/feb.parquet', 'https://cdn.example.com/mar.parquet' ]); -- Hive partitioning — injects year/month as columns FROM read_parquet( 's3://lake/events/year=*/month=*/*.parquet', hive_partitioning = true ); -- Schema inspection before you query DESCRIBE SELECT * FROM read_parquet('events.parquet'); -- Parquet metadata (row groups, compression, stats) SELECT * FROM parquet_metadata('events.parquet'); SELECT * FROM parquet_schema('events.parquet'); SELECT * FROM parquet_file_metadata('events.parquet'); -- Which row groups were actually scanned? (filter pushdown audit) SELECT row_group_id, num_rows FROM parquet_metadata('events.parquet') WHERE stats_min_value <= '2024-06-01' AND stats_max_value >= '2024-06-01';
-- Load extension once: INSTALL iceberg; LOAD iceberg; -- Scan current snapshot (REST catalog): FROM iceberg_scan('s3://lake/catalog/my_table'); -- Time travel — snapshot_id or timestamp: FROM iceberg_scan( 's3://lake/catalog/events', snapshot_id = 5432198765432198765 ); -- List all snapshots (audit trail): FROM iceberg_snapshots('s3://lake/catalog/events'); -- Schema evolution — read old snapshot with new schema mapping: FROM iceberg_scan( 's3://lake/catalog/events', allow_moved_paths = true -- handles S3 prefix changes );
INSTALL delta; LOAD delta; -- Current version: FROM delta_scan('s3://lake/tables/orders'); -- Time travel by version: FROM delta_scan( 's3://lake/tables/orders', version = 42 ); -- Inspect transaction log: FROM delta_table_info('s3://lake/tables/orders');
INSTALL httpfs; LOAD httpfs; -- Option 1: env vars (AWS_ACCESS_KEY_ID etc. auto-picked up) FROM read_parquet('s3://my-bucket/data/*.parquet'); -- Option 2: explicit credentials CREATE SECRET aws_creds ( TYPE s3, KEY_ID 'AKIA...', SECRET 'secret...', REGION 'us-east-1' ); -- Option 3: assume IAM role CREATE SECRET aws_role ( TYPE s3, PROVIDER CREDENTIAL_CHAIN, ROLE_ARN 'arn:aws:iam::123:role/DataRole' ); -- S3-compatible (MinIO, Cloudflare R2, Backblaze B2): CREATE SECRET r2 ( TYPE s3, KEY_ID '...', SECRET '...', ENDPOINT 'account.r2.cloudflarestorage.com', URL_STYLE 'path' ); -- Write back to S3 — it's bidirectional: COPY ( SELECT * FROM orders WHERE year = 2024 ) TO 's3://output/orders_2024.parquet' (FORMAT parquet, COMPRESSION zstd, ROW_GROUP_SIZE 122880);
-- Google Cloud Storage CREATE SECRET gcs_key ( TYPE gcs, KEY_ID 'service-account@proj.iam.gserviceaccount.com', SECRET '-----BEGIN PRIVATE KEY-----...' ); FROM read_parquet('gs://bucket/events/*.parquet'); -- Azure Blob Storage INSTALL azure; LOAD azure; CREATE SECRET az ( TYPE azure, CONNECTION_STRING 'DefaultEndpointsProtocol=https;AccountName=...' ); FROM read_parquet('az://container/events/*.parquet');
SELECT s.order_id, g.customer_name, a.campaign_source FROM read_parquet('s3://orders/2024/*.parquet') s JOIN read_parquet('gs://crm/customers/*.parquet') g USING (customer_id) JOIN read_parquet('az://marketing/campaigns/*.parquet') a ON s.utm_source = a.source_key WHERE s.created_at >= '2024-01-01';
-- auto_detect sniffs delimiter, quoting, types, header FROM read_csv('data.csv', auto_detect = true); -- What did DuckDB infer? (super useful for debugging) SELECT * FROM sniff_csv('messy.csv'); -- returns: delimiter, quoting, types, has_header, etc. -- Explicit control when auto-detect fails: FROM read_csv('pipe_delimited.txt', delim = '|', header = true, quote = '"', escape = '\\', skip = 3, -- skip 3 header rows dateformat = '%Y/%m/%d', timestampformat = '%Y-%m-%d %H:%M:%S', nullstr = ['N/A', '', 'NULL'], ignore_errors = true, -- skip malformed rows columns = { 'id': 'BIGINT', 'ts': 'TIMESTAMP', 'amount': 'DECIMAL(18,4)' } ); -- Error reporting — WHERE did parsing fail? FROM read_csv('messy.csv', ignore_errors = true, store_rejects = true -- write failures to reject tables ); SELECT * FROM reject_errors; -- inspect what failed and why SELECT * FROM reject_scans;
-- NDJSON (newline-delimited, Kafka default): FROM read_ndjson('events.ndjson', auto_detect = true); -- JSON array: FROM read_json('data.json', format = 'array'); -- Deeply nested — extract with arrow operators: SELECT id, payload ->> '$.user.email' AS email, payload ->> '$.items[0].sku' AS first_sku, json_extract_string(payload, '$.meta.source') AS source, json_array_length(payload -> '$.items') AS item_count FROM read_json('orders.json'); -- Unnest an array column into rows: SELECT id, unnest(payload -> '$.items') AS item FROM read_json('orders.json'); -- Type-safe struct extraction: SELECT json_transform(payload, '{"user": {"id": "BIGINT", "name": "VARCHAR", "tags": ["VARCHAR"]}}') AS typed FROM events; -- GLOB across thousands of NDJSON files (Kafka S3 sink): FROM read_ndjson('s3://kafka-sink/topic=orders/**/*.json', hive_partitioning = true, auto_detect = true, maximum_object_size = 1048576 -- 1MB max object );
-- PostgreSQL (push-down capable): ATTACH 'host=prod-db dbname=myapp user=ro password=pw' AS pg (TYPE postgres, READ_ONLY); -- MySQL: ATTACH 'mysql://user:pass@host:3306/mydb' AS mysql (TYPE mysql_scanner, READ_ONLY); -- SQLite (your existing .db files): ATTACH 'local_app.db' AS sqlite (TYPE sqlite); -- Another DuckDB file: ATTACH 'analytics.duckdb' AS analytics; -- List all attached databases: SHOW DATABASES; -- Cross-database JOIN — Postgres OLTP + S3 data lake: SELECT p.order_id, p.status, -- from live Postgres e.event_type, -- from S3 Parquet e.event_ts FROM pg.public.orders p LEFT JOIN read_parquet('s3://lake/events/*.parquet') e ON p.order_id = e.order_id WHERE p.created_at >= current_date - INTERVAL '7 days'; -- Copy Postgres table to local DuckDB (one-shot migration): CREATE TABLE orders AS SELECT * FROM pg.public.orders;
import duckdb from confluent_kafka import Consumer import pyarrow as pa con = duckdb.connect('events.duckdb') con.execute(""" CREATE TABLE IF NOT EXISTS events ( event_id VARCHAR PRIMARY KEY, user_id BIGINT, event_type VARCHAR, event_ts TIMESTAMP, payload JSON ) """) consumer = Consumer({...}) consumer.subscribe(['events']) # Micro-batch: accumulate Arrow batches, flush to DuckDB batch = [] while True: msgs = consumer.consume(1000, timeout=1.0) for msg in msgs: batch.append(orjson.loads(msg.value())) if len(batch) >= 10_000: # Zero-copy: list of dicts → Arrow → DuckDB tbl = pa.Table.from_pylist(batch) con.execute(""" INSERT OR REPLACE INTO events SELECT * FROM tbl """) batch = [] consumer.commit()
read_ndjson('s3://sink/topic=events/**'). No Spark, no EMR, no cluster — just DuckDB on your laptop or a Lambda.
-- Query a Parquet file over HTTPS directly: FROM read_parquet('https://datasets.example.com/taxi.parquet'); -- GitHub raw CSV (useful for seed data): FROM read_csv('https://raw.githubusercontent.com/org/repo/main/data.csv'); -- Hugging Face datasets (they expose Parquet): FROM read_parquet('https://huggingface.co/datasets/org/dataset/resolve/main/data/*.parquet'); -- HTTP range requests — DuckDB fetches only needed byte ranges. -- For a 10GB remote Parquet, a selective query may fetch <100KB. -- Authentication via secret: CREATE SECRET api_auth ( TYPE http, HEADERS MAP {'Authorization': 'Bearer my-token'} ); FROM read_json('https://api.example.com/events?format=ndjson');
# Pipe AWS CLI output directly into DuckDB: aws s3 cp s3://bucket/data.parquet - \ | duckdb -c "FROM read_parquet('/dev/stdin')" # Stream a gzipped CSV through duckdb: zcat huge.csv.gz \ | duckdb -c "FROM read_csv('/dev/stdin') LIMIT 10" # Query Docker container logs (JSON format): docker logs my-app --since 1h 2>&1 \ | duckdb -c " SELECT json->>'$.level' lvl, count(*) n FROM read_ndjson('/dev/stdin') GROUP BY ALL ORDER BY n DESC " # curl → DuckDB analytical pipeline: curl -s 'https://api.example.com/events.ndjson' \ | duckdb -c " SELECT event_type, count(*), avg(latency_ms) FROM read_ndjson('/dev/stdin') GROUP BY ALL " # Export a PostgreSQL table as Parquet in one pipeline: psql -c "COPY orders TO STDOUT (FORMAT binary)" \ | python -c " import sys, duckdb duckdb.sql(\"\"\" COPY (SELECT * FROM read_csv('/dev/stdin')) TO 'orders.parquet' (FORMAT parquet) \"\"\") "
import duckdb, pandas as pd, pyarrow as pa # Pandas DataFrame — DuckDB queries it by variable name df = pd.read_parquet('huge_file.parquet') result = duckdb.sql(""" SELECT customer_id, sum(amount) total FROM df -- df is the Python variable! WHERE status = 'COMPLETED' GROUP BY ALL ORDER BY total DESC """).df() # .df() = back to pandas # Arrow Table — same magic arrow_tbl = pa.ipc.open_file('data.arrow').read_all() result = duckdb.sql("SELECT * FROM arrow_tbl LIMIT 100") # Output formats — choose your weapon: duckdb.sql("SELECT * FROM events").df() # pandas DataFrame duckdb.sql("SELECT * FROM events").pl() # Polars DataFrame duckdb.sql("SELECT * FROM events").arrow() # PyArrow Table duckdb.sql("SELECT * FROM events").fetchall() # list of tuples duckdb.sql("SELECT * FROM events").fetchdf() # alias for .df() # Streaming — for larger-than-RAM results: rel = duckdb.sql("SELECT * FROM huge_parquet") for batch in rel.fetch_arrow_reader(100_000): process_batch(batch) # 100k rows at a time
import duckdb, polars as pl # DuckDB → Polars (zero copy) lf = pl.scan_parquet('events.parquet') # Mix Polars LazyFrame with DuckDB SQL: result = duckdb.sql(""" SELECT date_trunc('day', event_ts) AS day, event_type, count(*) AS n FROM lf -- Polars LazyFrame directly! GROUP BY ALL ORDER BY day DESC """).pl() # returns Polars DataFrame # Polars LazyFrame → DuckDB → Arrow → back to Polars: # All in process, zero copies, all CPU cores # Register a relation as a named view: con = duckdb.connect() con.register('events_view', lf.collect()) con.sql("SELECT * FROM events_view LIMIT 10") # Chunked reading for ML pipelines: reader = con.execute("SELECT features FROM training_data") while True: chunk = reader.fetch_arrow_table() if not chunk: break train_on(chunk.to_pandas())
-- SELECT * EXCLUDE — drop columns by name (not rename) SELECT * EXCLUDE (_fivetran_deleted, _loaded_at, ssn, credit_card) FROM customers; -- SELECT * REPLACE — override specific columns inline SELECT * REPLACE ( amount / 100.0 AS amount, -- cents → dollars in-place upper(status) AS status -- normalize casing ) FROM orders; -- COLUMNS(regex) — select by pattern SELECT COLUMNS('.*_at$') -- all timestamp columns FROM events; SELECT COLUMNS('revenue|amount|cost') -- financial columns FROM mart_revenue; -- Apply a function to ALL matching columns at once: SELECT id, round(COLUMNS('.*_amount$'), 2) -- round every amount column FROM orders; -- COLUMNS with lambda (wildcard aggregation): SELECT min(COLUMNS('.*')), -- min of every column max(COLUMNS('.*')), -- max of every column count(COLUMNS('.*')) -- non-null count of every column FROM events;
-- GROUP BY ALL — infers non-aggregated columns automatically SELECT date_trunc('day', event_ts) AS day, event_type, platform, count(*) AS n, avg(latency_ms) AS avg_latency FROM events GROUP BY ALL; -- auto: GROUP BY day, event_type, platform -- ORDER BY ALL — sorts by every non-aggregated column SELECT year, quarter, revenue FROM mart_revenue ORDER BY ALL; -- ORDER BY year, quarter -- ORDER BY alias — reference output column name SELECT event_ts::date AS day, count(*) AS event_count FROM events GROUP BY day -- reference alias directly! ORDER BY event_count DESC; -- same here -- FROM first — anti-Cartesian safety + readability FROM events SELECT event_id, event_type WHERE event_ts > '2024-01-01' LIMIT 100; -- FROM before SELECT is valid DuckDB!
-- PIVOT: rows → columns (dynamic cross-tab) PIVOT orders ON status -- unique values become column headers USING count(*) -- aggregation for each cell GROUP BY customer_id; -- Result: customer_id | PENDING | COMPLETED | CANCELLED -- PIVOT with explicit values (for deterministic column order): PIVOT revenue_data ON quarter IN (Q1, Q2, Q3, Q4) USING sum(amount) AS rev GROUP BY product_line; -- UNPIVOT: columns → rows (melt wide to long) UNPIVOT wide_metrics ON revenue, cost, profit -- columns to collapse INTO NAME metric_name -- new key column VALUE metric_value; -- new value column -- Dynamic PIVOT on expression (all months in data): PIVOT ( SELECT product, strftime(order_date, '%Y-%m') AS month, revenue FROM orders ) ON month USING sum(revenue) GROUP BY product;
-- QUALIFY filters on window function results. -- Without QUALIFY, you need a subquery or CTE. -- Latest order per customer (Top-N per group): SELECT customer_id, order_id, created_at, amount FROM orders QUALIFY row_number() OVER ( PARTITION BY customer_id ORDER BY created_at DESC ) = 1; -- Deduplicate on composite key, keep latest: SELECT * FROM raw_events QUALIFY row_number() OVER ( PARTITION BY event_id ORDER BY _ingested_at DESC ) = 1; ───────────────────────────────────────────────── -- ASOF JOIN — time-series "as of" matching. -- For each event, find the exchange rate in effect AT THAT TIME. SELECT o.order_id, o.amount_usd, o.amount_usd * r.rate_gbp AS amount_gbp FROM orders o ASOF JOIN exchange_rates r ON o.currency = r.currency AND o.created_at >= r.valid_from; -- ASOF finds the largest r.valid_from <= o.created_at. -- No self-joins, no correlated subqueries. Pure beauty.
-- DuckDB has first-class LIST type and lambda functions. -- list_transform: map over a list SELECT list_transform([1,2,3,4], x -> x * x); -- → [1, 4, 9, 16] -- list_filter: filter with a predicate SELECT list_filter(tags, t -> t LIKE 'prod_%') FROM events; -- list_reduce: fold SELECT list_reduce([10,20,30], (acc, x) -> acc + x); -- → 60 -- Nested lambda: normalize prices in a list SELECT product_id, list_transform( price_history, p -> round(p * exchange_rate, 2) ) AS price_gbp FROM products; -- Aggregate into list, then filter: SELECT customer_id, list_filter( list(order_id ORDER BY created_at), (o, i) -> i < 5 -- first 5 orders only (index lambda) ) AS first_5_orders FROM orders GROUP BY 1;
-- Struct: composite type with named fields SELECT struct_pack(name := 'Alice', age := 32) AS person, person.name, -- dot access on struct person['age']; -- bracket access -- Unnest array column into rows: SELECT order_id, unnest(line_items) AS item FROM orders; -- Unnest with ordinality (keep array index): SELECT order_id, item, idx FROM orders, unnest(line_items) WITH ORDINALITY t(item, idx); -- SUMMARIZE — instant profiling of any table/query: SUMMARIZE events; -- Returns: count, nulls, min, max, avg, std, -- q25, q50, q75 for EVERY column. Zero code. -- generate_series — date spines, sequences: SELECT generate_series( '2024-01-01'::date, '2024-12-31'::date, INTERVAL '1 day' ) AS day; -- SAMPLE — fast approximate queries: SELECT avg(amount), stddev(amount) FROM events USING SAMPLE 10%; -- reservoir sample FROM events USING SAMPLE 100000 ROWS; -- fixed row count
INSTALL httpfs; -- download from registry (once per machine) LOAD httpfs; -- activate in this session UPDATE EXTENSIONS; -- upgrade all installed extensions SELECT * FROM duckdb_extensions(); -- inspect what's installed + loaded SELECT extension_name, loaded, installed FROM duckdb_extensions();
read_parquet('s3://...'), secrets for credentials, range-request support.GEOMETRY type, 100+ ST_ functions, reads Shapefile/GeoJSON/GeoParquet. Run geospatial analytics at DuckDB speed — no Postgres needed.-> and ->> operators. json_transform() for type-safe extraction. Handles malformed JSON gracefully.CREATE TABLE AS SELECT. Works with RDS, Aurora, Supabase, Neon.~/.aws/credentials, EC2 metadata. Used alongside httpfs. Essential for production AWS deployments.az://container/path URI scheme for seamless Azure data lake access.PRAGMA create_fts_index('docs', 'id', 'body') then match_bm25(id, 'search terms'). Run text search analytics without Elasticsearch.read_xlsx() for Excel files. Finally — query spreadsheets like tables. Also adds Excel-compatible number formatting functions. COPY ... TO 'output.xlsx' for export.duckdb_vss (vector similarity search / HNSW index for embeddings), lance (Lance columnar format), chsql (ClickHouse SQL dialect), duckpgq (property graph queries / Cypher-like syntax). Install with INSTALL ext FROM community.
import duckdb # In-memory (default) — fastest, no persistence: con = duckdb.connect() # or duckdb.connect(':memory:') # Persistent file — survives process restart: con = duckdb.connect('analytics.duckdb') # Read-only file — safe for concurrent reads: con = duckdb.connect('analytics.duckdb', read_only=True) # Module-level shortcut (implicit in-memory connection): duckdb.sql("SELECT 42") # no connect() needed # Context manager — auto-close: with duckdb.connect('analytics.duckdb') as con: result = con.sql("SELECT count(*) FROM events").fetchone() # Configuration at connect-time: con = duckdb.connect(config={ 'threads': 8, 'memory_limit': '16GB', 'temp_directory': '/tmp/duckdb_spill' })
# A Relation is a lazy query plan — nothing executes until .execute() rel = con.table('events') rel = con.read_parquet('s3://bucket/events/*.parquet') rel = con.sql("SELECT * FROM events") # Chain operations (all lazy): result = ( con.read_parquet('events.parquet') .filter("event_type = 'purchase'") .project("user_id, amount, event_ts") .aggregate("user_id, sum(amount) AS total") .order("total DESC") .limit(100) ) # Execute and collect: df = result.df() # pandas pl_df = result.pl() # polars arrow = result.arrow() # pyarrow rows = result.fetchall() # list of tuples # Inspect the query plan: print(result.explain()) print(result.query_df("SELECT * FROM query_relation"))
import duckdb from duckdb.typing import VARCHAR, BIGINT, DOUBLE import re, hashlib con = duckdb.connect() # --- SCALAR UDF --- def mask_email(email: str) -> str: if not email: return None local, _, domain = email.partition('@') return f"{local[:2]}***@{domain}" con.create_function( 'mask_email', mask_email, [VARCHAR], # input types VARCHAR, # return type null_handling='special' # handle NULLs in Python ) con.sql("SELECT mask_email(email) FROM customers") # --- VECTORIZED UDF (Arrow-native, batched) --- import pyarrow as pa def hash_ids(ids: pa.Array) -> pa.Array: # Runs on the full 1024-row vector at once — no Python loop! hashed = [ hashlib.sha256(str(v).encode()).hexdigest()[:16] for v in ids.to_pylist() ] return pa.array(hashed, type=pa.string()) con.create_function( 'hash_id', hash_ids, [BIGINT], VARCHAR, type=duckdb.functional.PythonUDFType.ARROW ) # --- AGGREGATE UDF --- class GeometricMean: def __init__(self): self.values = [] def step(self, x): if x and x > 0: self.values.append(x) def finalize(self): if not self.values: return None return (reduce(lambda a,b: a*b, self.values) ) ** (1/len(self.values)) con.create_aggregate_function('geo_mean', GeometricMean, [DOUBLE], DOUBLE)
# Jupyter magic — write SQL in cells directly: %load_ext duckdb_magic %%duckdb SELECT year, sum(revenue) total FROM read_parquet('data/*.parquet') GROUP BY ALL ORDER BY year; # Query profiling in Python: con.execute("PRAGMA enable_profiling") con.execute("PRAGMA profiling_output='/tmp/profile.json'") con.sql("SELECT * FROM big_table WHERE x > 1000") # Open profile.json in chrome://tracing # EXPLAIN ANALYZE: print(con.execute(""" EXPLAIN ANALYZE SELECT customer_id, sum(amount) FROM events WHERE event_ts > '2024-01-01' GROUP BY 1 """).fetchdf().to_string())
import ibis # Ibis = portable dataframe API that compiles to SQL. # DuckDB is the best Ibis backend (fastest, most complete). con = ibis.duckdb.connect('analytics.duckdb') events = con.table('events') result = ( events .filter([ events.event_ts > ibis.timestamp('2024-01-01'), events.event_type.isin(['purchase', 'refund']) ]) .group_by(['user_id', 'event_type']) .aggregate( n = events.event_id.count(), total = events.amount.sum(), avg_lat = events.latency_ms.mean() ) .order_by(ibis.desc('total')) .limit(100) ) # See the compiled SQL: ibis.to_sql(result) # Execute: df = result.to_pandas() # Switch backend to BigQuery with zero code changes: # con = ibis.bigquery.connect(project='my-proj', dataset='my_ds')
-- Parallelism (default: all CPU cores) SET threads = 16; SET threads = 1; -- deterministic debugging -- Memory (default: 80% of RAM) SET memory_limit = '32GB'; SET memory_limit = '-1'; -- unlimited -- External spilling — enables larger-than-RAM queries SET temp_directory = '/fast-nvme/duckdb_tmp'; SET max_temp_directory_size = '500GB'; -- Worker threads for Parquet I/O SET worker_threads_for_csv_writing = 8; -- Preserve insertion order (OFF = faster GROUP BY) SET preserve_insertion_order = false; -- Enable object cache (S3 metadata caching) SET enable_object_cache = true; -- HTTP metadata cache for remote Parquet SET http_metadata_cache_enable = true; -- Profile every query: PRAGMA enable_progress_bar; PRAGMA enable_profiling = 'json'; PRAGMA profiling_output = '/tmp/duck_profile.json';
-- Write partitioned Parquet — the ideal ingress landing zone: COPY ( SELECT* , year(event_ts) AS yr, month(event_ts) AS mo FROM events ) TO 's3://output/events' ( FORMAT parquet, PARTITION_BY (yr, mo), -- hive partitioning COMPRESSION zstd, -- best ratio/speed tradeoff ROW_GROUP_SIZE 122880, -- 128K rows/group (sweet spot) OVERWRITE_OR_IGNORE true ); -- Write sorted Parquet (dramatically improves filter pushdown): COPY ( SELECT * FROM events ORDER BY customer_id, event_ts ) TO 'events_sorted.parquet' ( FORMAT parquet, COMPRESSION zstd ); -- Sorted Parquet + row group stats = skip entire row groups on filter. -- 100x scan speedup on customer_id range queries. -- DuckDB native format (fastest for DuckDB-only workloads): CHECKPOINT; -- flush WAL to .duckdb file -- The .duckdb file IS the storage — already column-grouped, -- zone-mapped, FSST-compressed. No conversion needed.
EXPLAIN ANALYZE SELECT customer_id, sum(amount) total FROM read_parquet('s3://lake/events/**/*.parquet') WHERE event_ts >= '2024-01-01' AND event_type = 'purchase' GROUP BY 1; ┌─────────────────────────────────────┐ │ HASH_GROUP_BY │ ← aggregation │ Groups: [customer_id] │ │ Aggregates: [sum(amount)] │ │ ~12,450 rows (estimated) │ └──────────────────┬──────────────────┘ │ ┌──────────────────▼──────────────────┐ │ FILTER │ ← predicate applied │ event_type = 'purchase' │ │ Scanned: 2,847,291 rows │ │ Passed: 183,421 rows (6.4%) │ └──────────────────┬──────────────────┘ │ ┌──────────────────▼──────────────────┐ │ PARQUET_SCAN (parallel) │ ← 8 threads │ Files: 47 / 3,892 (row group skip) │ ← MOST SKIPPED! │ Columns: [customer_id,amount,...] │ ← projection pushdown │ Filters pushed: [event_ts >=...] │ ← filter pushdown │ Total rows: 847M → scanned: 23M │ ← zone map elimination └─────────────────────────────────────┘ -- KEY INSIGHT: 3,845 row groups skipped via zone maps (min/max stats). -- Only 47 row groups needed reading. That's the 100x speedup.
Representative only. Results vary by query type and hardware. DuckDB advantage is largest on aggregations and scans.
import duckdb, boto3, orjson from datetime import datetime, timezone """ ARCHITECTURE: Kafka → Consumer → DuckDB (transform) → S3 Parquet (partitioned) Queries: DuckDB directly on S3 Parquet No cluster. No warehouse. No ETL server. """ con = duckdb.connect() con.execute("INSTALL httpfs; LOAD httpfs") def land_batch(events: list[dict], dt: datetime): # Ingest into DuckDB in-memory, transform, write Parquet con.execute("CREATE OR REPLACE TABLE staging AS SELECT * FROM events") path = ( f"s3://my-lake/events/" f"year={dt.year}/month={dt.month:02d}/day={dt.day:02d}/" f"batch_{dt.timestamp():.0f}.parquet" ) con.execute(f""" COPY ( SELECT event_id, user_id, event_type, -- DuckDB parsing JSON inline: (payload->>'$.session_id')::VARCHAR AS session_id, (payload->>'$.amount')::DECIMAL(18,4) AS amount, event_ts::TIMESTAMP AS event_ts FROM staging WHERE event_id IS NOT NULL QUALIFY row_number() OVER ( PARTITION BY event_id ORDER BY _ingested_at DESC ) = 1 -- deduplicate in the same query ) TO '{path}' (FORMAT parquet, COMPRESSION zstd, ROW_GROUP_SIZE 65536) """) return path def query_lakehouse(start_date: str, end_date: str): return con.sql(f""" SELECT date_trunc('hour', event_ts) AS hour, event_type, count(*) AS n, sum(amount) AS revenue FROM read_parquet( 's3://my-lake/events/year=*/month=*/day=*/*.parquet', hive_partitioning = true ) WHERE event_ts BETWEEN '{start_date}' AND '{end_date}' GROUP BY ALL ORDER BY hour, revenue DESC """).df()
""" Pattern: API Gateway → Lambda (DuckDB) → S3 Parquet A serverless analytical query engine with zero infra. Lambda has 10GB memory and 15min timeout. DuckDB + 64-core Lambda = serious computation. """ import duckdb, json, os def handler(event, context): con = duckdb.connect() con.execute("INSTALL httpfs; LOAD httpfs") # IAM role auto-resolved via instance metadata con.execute(""" CREATE SECRET aws ( TYPE s3, PROVIDER CREDENTIAL_CHAIN ) """) query = event['queryStringParameters']['q'] start_date = event['queryStringParameters']['start'] end_date = event['queryStringParameters']['end'] # Parameterized to avoid injection: result = con.execute(""" SELECT event_type, count(*) n, sum(amount) rev FROM read_parquet( 's3://my-lake/events/year=*/month=*/day=*/*.parquet', hive_partitioning = true ) WHERE event_ts BETWEEN $1 AND $2 GROUP BY ALL ORDER BY rev DESC LIMIT 1000 """, [start_date, end_date]).df() return { 'statusCode': 200, 'body': result.to_json(orient='records'), 'headers': {'Content-Type': 'application/json'} }
// DuckDB-WASM: full DuckDB compiled to WebAssembly. // Query Parquet files in the browser. No server needed. import * as duckdb from '@duckdb/duckdb-wasm'; const BUNDLE = duckdb.selectBundle({ mvp: { mainModule: '.../duckdb-mvp.wasm', mainWorker: '.../duckdb-browser-mvp.worker.js' }, eh: { mainModule: '.../duckdb-eh.wasm', mainWorker: '.../duckdb-browser-eh.worker.js' }, }); const worker = new Worker(BUNDLE.mainWorker); const db = new duckdb.AsyncDuckDB(logger, worker); await db.instantiate(BUNDLE.mainModule); const con = await db.connect(); // Register a Parquet file from a URL: await db.registerFileURL('events.parquet', 'https://example.com/events.parquet', duckdb.DuckDBDataProtocol.HTTP, false ); // Query it — all in the browser, no round-trip to server: const result = await con.query(` SELECT event_type, count(*) AS n FROM read_parquet('events.parquet') GROUP BY ALL ORDER BY n DESC `); console.log(result.toArray());
# Ingress: ALL vectors → S3 (your pattern) # Storage: S3 Parquet (partitioned by ingestion time) # Transform: dbt + DuckDB (dbt-duckdb adapter) # Query: DuckDB directly on S3 or .duckdb file # Serve: Motherduck (DuckDB cloud) or direct DuckDB files stack: ingestion: kafka_sink: "Kafka Connect → S3 (Parquet, 10min batches)" api_ingress: "Lambda → DuckDB → S3 Parquet" db_replication:"DuckDB ATTACH postgres → COPY TO s3" storage: format: "Parquet + zstd compression" partitioning: "year= / month= / day= / hour=" size: "target 100MB–1GB per Parquet file" catalog: "Apache Iceberg (optional, for time travel)" transform: tool: "dbt with dbt-duckdb adapter" install: "pip install dbt-duckdb" profiles_yml: "type: duckdb / path: analytics.duckdb" advantage: "dbt models run locally, no warehouse bill" query: development: "DuckDB CLI or Python on laptop" production: "Motherduck (serverless DuckDB cloud)" bi_tools: "Evidence.dev (SQL → dashboards, uses DuckDB)" estimated_cost: s3: "~$23/TB/month" compute: "$0 (local) or ~$50-100/mo (Motherduck)" vs_snowflake: "90-98% cost reduction for <2TB workloads"