import polars as pl # The two modes — choose your weapon df = pl.read_parquet('events.parquet') # eager: loads now lf = pl.scan_parquet('events.parquet') # lazy: builds a plan # Core pattern: build expression, call collect() result = ( pl.scan_parquet('s3://lake/events/**/*.parquet') .filter(pl.col('event_type') == 'purchase') .filter(pl.col('event_ts') >= pl.lit('2024-01-01').str.to_datetime()) .with_columns( (pl.col('amount_cents') / 100).alias('amount'), pl.col('event_ts').dt.truncate('1d').alias('day') ) .group_by(['day', 'user_id']) .agg( pl.len().alias('purchase_count'), pl.col('amount').sum().alias('total_spend'), pl.col('amount').mean().alias('avg_order') ) .sort('total_spend', descending=True) .collect() # ← triggers optimized execution ) # Instant profiling — like DuckDB SUMMARIZE: print(df.describe()) # count, mean, std, min, max per column print(df.schema) # full type information print(df.estimated_size('mb')) # memory footprint
Python's Global Interpreter Lock prevents true multi-threading. Polars' Rust core releases the GIL immediately. Every operation — sort, group_by, join — uses all CPU cores via Rayon's work-stealing thread pool. A 16-core machine runs 16× faster, not 1×.
Polars stores data in Apache Arrow columnar buffers. When you pass a Polars DataFrame to DuckDB or PyArrow, no data is copied. The same memory is read by all three engines. The full dbt + DuckDB + Polars stack moves data between tools in nanoseconds.
Polars' schema is fixed at LazyFrame construction. If you .filter(pl.col('amt') > 'oops') — comparing numeric to string — the error fires at query build time, not hours into a production run. Entire categories of runtime bugs become compile-time errors.
Every .scan_*() call returns a LazyFrame. Chained operations build a logical plan. .collect() runs the optimizer, produces a physical plan, and executes it.
import polars as pl lf = ( pl.scan_parquet('events.parquet') .filter(pl.col('event_type') == 'purchase') .select(['user_id', 'amount', 'event_ts']) .group_by('user_id') .agg(pl.col('amount').sum()) ) # See the LOGICAL plan (your intent): print(lf.explain(optimized=False)) # See the OPTIMIZED plan (what actually runs): print(lf.explain(optimized=True)) # Notice: filter & select are pushed INTO the ParquetScan. # Visualize as a tree (requires graphviz): lf.show_graph(optimized=True) # ─── STREAMING — larger-than-RAM execution ─────────── # collect() loads everything into RAM. # collect(engine='streaming') processes in chunks. result = ( pl.scan_parquet('s3://lake/events/**/*.parquet') .filter(pl.col('amount') > 0) .group_by('user_id') .agg(pl.col('amount').sum()) .collect(engine='streaming') # process 500MB at a time ) # sink_parquet — stream directly to disk, never loads to RAM: ( pl.scan_parquet('huge_100gb_file.parquet') .filter(pl.col('status') == 'active') .sink_parquet('output/active_users.parquet') # executes streaming — never materializes full DataFrame ) # sink_csv, sink_ipc, sink_ndjson also available ( pl.scan_parquet('events.parquet') .sink_ipc('events.arrow') # Arrow IPC stream format )
# scan_parquet — the workhorse pl.scan_parquet( source = 's3://bucket/data/**/*.parquet', n_rows = 1_000_000, # early stop (fast LIMIT) row_index_name = '_row_idx', # inject row numbers row_index_offset = 0, parallel = 'auto', # 'columns' | 'row_groups' | 'prefiltered' use_statistics = True, # row group zone map pushdown hive_partitioning = True, # inject partition columns from path hive_schema = {'year': pl.Int32, 'month': pl.Int32}, include_file_paths = '_source_file', # track which file each row came from storage_options = {'aws_access_key_id': '...', 'region': 'us-east-1'} ) # scan_csv — streaming CSV with full control pl.scan_csv( source = 'data/*.csv', separator = ',', has_header = True, skip_rows = 0, dtypes = {'id': pl.Int64, 'amount': pl.Float64}, null_values = ['', 'N/A', 'null'], try_parse_dates = True, # auto-detect date columns ignore_errors = True, # skip malformed rows encoding = 'utf8-lossy', # handle dirty encodings comment_prefix = '#' # skip comment lines ) # scan_ndjson — Kafka S3 sink output pl.scan_ndjson( source = 's3://kafka-sink/topic=events/**/*.json', infer_schema_length = 1000, # sample N rows for schema inference schema = pl.Schema({ # or provide explicit schema 'event_id': pl.String, 'user_id': pl.Int64, 'event_ts': pl.Datetime('us'), 'payload': pl.String, # keep JSON as string for later parsing }) )
# The .str namespace wraps every string operation df.with_columns([ pl.col('email') .str.to_lowercase() .str.strip_chars() .str.split('@') .list.first() # → local part of email .alias('email_local'), pl.col('phone') .str.replace_all(r'[^\d]', '') # strip non-digits (regex) .str.zfill(10) # zero-pad to 10 chars .alias('phone_clean'), pl.col('payload_str') .str.json_path_match('$.user.id') # JSONPath on string column .cast(pl.Int64) .alias('user_id_extracted'), pl.col('event_type') .str.contains('purchase') # boolean mask .alias('is_purchase'), pl.col('url') .str.extract(r'utm_source=([^&]+)', group_index=1) .alias('utm_source'), pl.col('tags_csv') .str.split(',') # → List[String] .list.eval(pl.element().str.strip_chars()) .alias('tags'), pl.col('name').str.count_matches(r'\s+').alias('word_count'), pl.col('text').str.len_chars().alias('char_count'), pl.col('code').str.starts_with('ORD-').alias('is_order'), ])
.str.contains(), .str.extract(), and .str.replace_all() use the Rust regex crate. It's compiled once, executed across all CPU cores on vectorized string data. Regex processing in Polars is orders of magnitude faster than Python's re module in a loop.
# Parse, truncate, extract, convert — all vectorized df.with_columns([ # Parse strings → Datetime pl.col('ts_str') .str.to_datetime('%Y-%m-%dT%H:%M:%S%.f%z') .dt.convert_time_zone('UTC') .alias('ts_utc'), # Truncate to period (the Polars date_trunc) pl.col('ts_utc').dt.truncate('1h').alias('hour_bucket'), pl.col('ts_utc').dt.truncate('1d').alias('day'), pl.col('ts_utc').dt.truncate('1mo').alias('month'), # Extract components pl.col('ts_utc').dt.year().alias('year'), pl.col('ts_utc').dt.month().alias('month_num'), pl.col('ts_utc').dt.weekday().alias('weekday'), # 1=Mon pl.col('ts_utc').dt.hour().alias('hour'), pl.col('ts_utc').dt.ordinal_day().alias('doy'), # Duration arithmetic (pl.col('end_ts') - pl.col('start_ts')) .dt.total_seconds() .alias('session_seconds'), # Epoch conversions pl.col('epoch_ms') .cast(pl.Datetime('ms')) .dt.convert_time_zone('America/New_York') .alias('ts_local'), # Rolling periods: "is this weekend?" pl.col('ts_utc').dt.weekday().is_in([6,7]).alias('is_weekend'), # Offset by duration (pl.col('ts_utc') + pl.duration(days=30)).alias('ts_plus_30d'), ])
Datetime('us') (timezone-naive, microseconds) from Datetime('us', 'UTC') (timezone-aware). Operations between naive and aware datetimes raise errors at plan time — before your pipeline runs. pandas silently mixes them and produces garbage.
# Polars has first-class List and Struct types. # These unlock JSON-like nested data without exploding rows. df.with_columns([ # List stats without unnesting pl.col('line_items').list.len().alias('item_count'), pl.col('prices').list.sum().alias('total'), pl.col('prices').list.mean().alias('avg_price'), pl.col('prices').list.max().alias('max_price'), pl.col('tags').list.first().alias('primary_tag'), pl.col('tags').list.last().alias('last_tag'), # Filter list elements with lambda pl.col('prices') .list.eval(pl.element().filter(pl.element() > 100)) .alias('expensive_items'), # Sort / unique list elements pl.col('tags').list.sort().list.unique().alias('sorted_unique_tags'), # Contains check pl.col('tags').list.contains('premium').alias('is_premium'), # Gather by index pl.col('prices').list.gather([0, -1]).alias('first_and_last'), ]) # EXPLODE list column to rows (1-to-many): df.explode('line_items') # list.eval — run ANY expression on each list element: # This is Polars' equivalent of pandas apply() but vectorized. df.with_columns( pl.col('amounts') .list.eval( (pl.element() - pl.element().mean()) / pl.element().std() ) .alias('amounts_z_score') )
# Struct: a column where each value is a named record. # Perfect for JSON payloads and nested API responses. # Parse JSON string → Struct df.with_columns( pl.col('json_payload') .str.json_decode() # → Struct column .alias('payload_struct') ) # Access struct fields with dot notation: df.with_columns([ pl.col('payload_struct').struct.field('user_id').alias('user_id'), pl.col('payload_struct').struct.field('email').alias('email'), ]) # Unnest struct → individual columns: df.unnest('payload_struct') # Build a struct column from multiple columns: df.select( pl.struct([ pl.col('first_name'), pl.col('last_name'), pl.col('age'), ]).alias('person') ) # Rename struct fields: pl.col('payload_struct').struct.rename_fields([ 'user_id', 'email', 'session_id' ]) # Struct in group_by — keep related fields together: df.group_by('user_id').agg( pl.struct([ pl.col('event_type').first(), pl.col('event_ts').first(), pl.col('amount').sum(), ]).alias('first_event_summary') )
# Window expressions in Polars use .over() instead of SQL's OVER PARTITION BY. # They compute GROUP-level aggregations and broadcast back to original rows — # without a join step. This is the Polars secret weapon for per-group metrics. df.with_columns([ # Running total per user — keeps all rows pl.col('amount') .cum_sum() .over('user_id') .alias('running_total_per_user'), # Rank within group pl.col('amount') .rank(method='dense', descending=True) .over('user_id') .alias('spend_rank'), # Group size (no join needed) pl.len().over('user_id').alias('user_event_count'), # Group mean (broadcast back to all rows) pl.col('amount').mean().over('user_id').alias('user_avg_spend'), # Z-score per user (subtract group mean, divide by group std) ( (pl.col('amount') - pl.col('amount').mean().over('user_id')) / pl.col('amount').std().over('user_id') ).alias('amount_zscore'), # First/last event per user pl.col('event_ts').min().over('user_id').alias('first_seen'), pl.col('event_ts').max().over('user_id').alias('last_seen'), # Previous row value per group (lag) pl.col('amount') .shift(1) .over('user_id', order_by='event_ts') .alias('prev_amount'), # Rolling window per group (7-day rolling sum) pl.col('amount') .rolling_sum_by('event_ts', window_size='7d') .over('user_id') .alias('rolling_7d_spend'), ])
.groupby().transform() — which has shocking overhead. In Polars, .over('user_id') computes the aggregate in a single parallel pass, stores it as a hash table, and broadcasts values back in a second pass. No merge(), no intermediate DataFrame, no memory duplication.
# pl.when().then().otherwise() is Polars' CASE WHEN. # Unlike Python if/else, this evaluates ALL branches in parallel, # then selects results with a bitmask — zero branch prediction cost. df.with_columns([ # Simple binary pl.when(pl.col('status') == 'active') .then(pl.lit(1)) .otherwise(pl.lit(0)) .alias('is_active'), # Multi-branch (chain .when().then()) pl.when(pl.col('amount') < 10) .then(pl.lit('micro')) .when(pl.col('amount') < 100) .then(pl.lit('small')) .when(pl.col('amount') < 1000) .then(pl.lit('medium')) .otherwise(pl.lit('large')) .alias('order_tier'), # Reference another column in the result pl.when(pl.col('is_refund')) .then(pl.col('amount') * -1) .otherwise(pl.col('amount')) .alias('signed_amount'), # Null handling in conditionals pl.when(pl.col('discount_code').is_null()) .then(pl.lit('NONE')) .otherwise(pl.col('discount_code').str.to_uppercase()) .alias('discount_normalized'), # Complex predicate with multiple conditions pl.when( (pl.col('amount') > 100) & (pl.col('status') == 'completed') & pl.col('user_id').is_not_null() ) .then(pl.col('amount') * 0.05) # 5% cashback .otherwise(pl.lit(0.0)) .alias('cashback'), ])
# map_elements: Python function per element (slow — avoid!) # Use only when no native Polars expression exists. df.with_columns( pl.col('text') .map_elements( lambda s: my_nlp_model.predict(s), return_dtype=pl.Float32 ) .alias('sentiment_score') ) # map_batches: Python function on a SERIES (vectorized). # Much faster — operates on Rust Series object. import numpy as np df.with_columns( pl.col('amount') .map_batches( lambda s: pl.Series(np.log1p(s.to_numpy())), return_dtype=pl.Float64 ) .alias('log_amount') ) # Plugin expressions (Rust UDFs via pyo3-polars): # Write UDFs in Rust that execute inside the optimizer's plan. # Zero Python overhead — runs at native Rust speed. # → cargo new polars-plugin && implement GrokExpression trait
# pl.all() — selects every column df.select(pl.all().sort_by('event_ts')) # pl.exclude() — all columns except named ones df.select(pl.exclude('_loaded_at', '_fivetran_deleted', 'ssn')) # pl.col(dtype) — select columns by data type df.select(pl.col(pl.Float64)) # all numeric float64 cols df.select(pl.col(pl.Utf8)) # all string cols df.select(pl.col(pl.Datetime)) # all datetime cols df.select(pl.col(pl.List)) # all list cols # Apply operation to ALL numeric columns at once: df.with_columns( pl.col(pl.Float64).round(2) # round all floats ) # Normalize (z-score) ALL numeric columns in one expression: df.with_columns([ ((pl.col(c) - pl.col(c).mean()) / pl.col(c).std()).alias(c) for c in df.select(pl.col(pl.Float64)).columns ]) # cs (column selectors) — the powerful selection DSL: import polars.selectors as cs df.select(cs.numeric()) # all numeric types df.select(cs.string()) # all string types df.select(cs.temporal()) # all date/datetime/duration df.select(cs.starts_with('event_')) # column name prefix df.select(cs.ends_with('_at')) # column name suffix df.select(cs.contains('amount')) # column name substring df.select(cs.numeric() - cs.by_name('id')) # set subtraction df.select(cs.numeric() | cs.temporal()) # set union df.select(~cs.numeric()) # complement (everything else)
polars.selectors is a first-class selection algebra. You can compose selectors with set operations: cs.numeric() - cs.by_name('id') means "all numeric columns except id." Combined with .with_columns(), this lets you apply transformations to dynamically-selected column sets — perfect for schema-agnostic pipeline code.
import polars as pl import polars.selectors as cs def normalize_pipeline(lf: pl.LazyFrame) -> pl.LazyFrame: """Works on any schema — no column names hardcoded.""" return ( lf .with_columns( # Lowercase all strings cs.string().str.to_lowercase(), # Round all floats cs.float().round(4), # UTC-normalize all datetimes cs.temporal().dt.convert_time_zone('UTC'), ) .with_columns( # Fill nulls: 0 for numeric, 'unknown' for string cs.numeric().fill_null(0), cs.string().fill_null('unknown'), ) )
String. As Categorical, it stores a 3-entry dictionary and 100M UInt32 indices — about 400MB. As Enum (fixed categories), it stores UInt8 indices — 100MB. Same data, 7× less memory, 3× faster group_by.
# Cast to Categorical at read time — saves memory from the start lf = pl.scan_parquet('events.parquet').with_columns( pl.col('event_type').cast(pl.Categorical), pl.col('status').cast(pl.Categorical), pl.col('country').cast(pl.Categorical), ) # Enum — when you KNOW the valid values (best performance): OrderStatus = pl.Enum(['pending', 'processing', 'shipped', 'delivered', 'cancelled', 'refunded']) df.with_columns( pl.col('status').cast(OrderStatus) ) # Invalid value in 'status'? Error at cast time, not query time. # Global Categorical — align dictionaries across DataFrames for joins: with pl.StringCache(): df1 = pl.read_parquet('jan.parquet').with_columns( pl.col('user_type').cast(pl.Categorical) ) df2 = pl.read_parquet('feb.parquet').with_columns( pl.col('user_type').cast(pl.Categorical) ) joined = df1.join(df2, on='user_type') # dictionaries aligned
import polars as pl from polars import Schema # Define your schema once — use it everywhere EVENTS_SCHEMA = Schema({ 'event_id': pl.String, 'user_id': pl.Int64, 'event_type': pl.Enum(['purchase', 'view', 'click', 'refund']), 'event_ts': pl.Datetime('us', 'UTC'), 'amount': pl.Decimal(precision=18, scale=4), 'session_id': pl.String, 'platform': pl.Categorical, 'country': pl.Categorical, 'metadata': pl.Struct({ 'ip': pl.String, 'ua': pl.String, }), }) def read_events(path: str) -> pl.LazyFrame: return pl.scan_parquet( path, schema=EVENTS_SCHEMA # enforced at read time ) # Schema mismatch → SchemaError before data is read. # Cast to schema after reading unknown sources: def enforce_schema(lf: pl.LazyFrame) -> pl.LazyFrame: return lf.cast(EVENTS_SCHEMA, strict=False) # strict=False: coerce where possible, null where impossible # strict=True (default): raise on any type mismatch
# Parquet — production output format df.write_parquet( 'output.parquet', compression = 'zstd', # best ratio + speed compression_level = 3, # 1-22, default 3 statistics = True, # zone map metadata row_group_size = 122_880, # rows per group use_pyarrow = False, # use native Rust writer ) # Partitioned Parquet (via sink — streaming) ( df.lazy() .sink_parquet( pl.PartitionMaxSizeBytes('output_dir/', max_size=256*1024**2) ) ) # Delta Lake — write directly from Polars from deltalake.writer import write_deltalake write_deltalake('s3://bucket/delta/events', df.to_arrow(), mode='append', partition_by=['year', 'month']) # CSV, JSON, Arrow IPC, NDJSON df.write_csv('output.csv', separator='|', date_format='%Y-%m-%d') df.write_ndjson('output.ndjson') df.write_ipc('output.arrow', compression='lz4') # Arrow IPC df.write_ipc_stream('stream.arrows') # Arrow streaming IPC df.write_avro('output.avro') # Apache Avro df.write_excel('report.xlsx', worksheet='Data') # Excel df.write_database( table_name='events', connection='postgresql://user:pw@host/db', if_table_exists='append' # 'replace' | 'fail' )
# Cloud storage credentials via environment or explicit config: import polars as pl storage_options = { 'aws_access_key_id': '...', 'aws_secret_access_key': '...', 'aws_region': 'us-east-1', } # Read from S3 (lazy) lf = pl.scan_parquet( 's3://bucket/events/**/*.parquet', storage_options=storage_options ) # Write to GCS df.write_parquet( 'gs://bucket/output.parquet', storage_options={'service_account': 'key.json'} ) ───────────────────────────────────────────────── # Arrow bridge — zero-copy interchange ecosystem: # Polars → PyArrow (zero copy) arrow_table = df.to_arrow() # PyArrow → Polars (zero copy) df = pl.from_arrow(arrow_table) # Polars → pandas (one copy, Arrow → pandas) pandas_df = df.to_pandas() # Use use_pyarrow_extension_array=True for Arrow-backed pandas: pandas_df = df.to_pandas(use_pyarrow_extension_array=True) # pandas → Polars df = pl.from_pandas(pandas_df) # Polars ↔ DuckDB (zero copy via Arrow): import duckdb # DuckDB queries a Polars LazyFrame directly: lf = pl.scan_parquet('events.parquet') result = duckdb.sql("SELECT * FROM lf WHERE amount > 100").pl() # Polars queries a DuckDB relation directly: con = duckdb.connect('analytics.duckdb') arrow_result = con.execute("SELECT * FROM events").arrow() df = pl.from_arrow(arrow_result)
# Every agg() argument is a full Polars expression df.group_by(['user_id', 'event_type']).agg([ # Basic aggregations pl.len().alias('count'), pl.col('amount').sum().alias('total'), pl.col('amount').mean().alias('avg'), pl.col('amount').std().alias('std'), pl.col('amount').min().alias('min'), pl.col('amount').max().alias('max'), pl.col('amount').median().alias('p50'), # Quantiles pl.col('amount').quantile(0.95).alias('p95'), pl.col('amount').quantile(0.99).alias('p99'), # First / last (ordered) pl.col('event_ts').min().alias('first_event'), pl.col('event_ts').max().alias('last_event'), pl.col('session_id').first().alias('first_session'), # Count distinct pl.col('session_id').n_unique().alias('unique_sessions'), # Collect into a list pl.col('amount').sort_by(pl.col('event_ts')).alias('amounts_ordered'), # Conditional count inside agg (pl.col('amount') > 100).sum().alias('large_orders'), # Complex expression inside agg (pl.col('amount') * pl.col('quantity')).sum().alias('revenue'), # Filter inside agg (conditional aggregation) pl.col('amount').filter(pl.col('status') == 'completed') .sum().alias('completed_revenue'), # Mode (most frequent value) pl.col('platform').mode().first().alias('primary_platform'), # String concatenation pl.col('event_type').unique().sort().str.join(',').alias('event_types'), ])
# group_by_dynamic — time-window aggregations # Perfect for: metrics over time, moving aggregates df.group_by_dynamic( index_column = 'event_ts', every = '1h', # window size period = '1h', # = every: no overlap offset = '0h', include_boundaries = True, # add _lower_boundary, _upper_boundary closed = 'left', # interval semantics group_by = ['user_id'], # optional: partition first start_by = 'datapoint', # 'window' | 'datapoint' | 'monday' ).agg([ pl.col('amount').sum().alias('hourly_spend'), pl.len().alias('event_count'), ]) ───────────────────────────────────────────────── # Rolling aggregations (per-row sliding window) df.with_columns( pl.col('amount') .rolling_mean_by('event_ts', window_size='7d') .alias('rolling_7d_avg'), pl.col('amount') .rolling_sum_by('event_ts', window_size='30d') .alias('rolling_30d_sum'), pl.col('amount') .rolling_max_by('event_ts', window_size='24h') .alias('rolling_24h_max'), ) ───────────────────────────────────────────────── # partition_by — split DataFrame into dict of DataFrames partitions = df.partition_by('year', 'month', as_dict=True) for (year, month), chunk in partitions.items(): chunk.write_parquet(f'output/year={year}/month={month}/data.parquet')
# Standard join types: df.join(other, on = 'user_id', # same name both sides left_on = 'user_id', # different names: right_on = 'id', # use left_on/right_on how = 'left', # 'inner'|'left'|'right'|'full'|'semi'|'anti'|'cross' suffix = '_right', # collision suffix coalesce = True, # coalesce join keys validate = 'm:1', # '1:1'|'1:m'|'m:1'|'m:m' — data contract! ) # validate= is the hidden gem. Pass '1:1' and Polars # raises an error if the right table has duplicates on # the join key — catch integrity bugs before they propagate. # Semi join — filter left by existence in right (no duplication): active_users = df.join(paid_users, on='user_id', how='semi') # Anti join — filter left by NON-existence in right: churned_users = df.join(active_users, on='user_id', how='anti') # Join on expression (not just column name): df.join_where( other, pl.col('event_ts') >= pl.col('valid_from'), pl.col('event_ts') < pl.col('valid_to'), pl.col('user_id') == pl.col('user_id'), ) # join_where = the Polars equivalent of DuckDB's ASOF JOIN. # Use it for: exchange rate lookups, SCD2 temporal joins, # event-to-session matching, validity windows. # Cross join (cartesian product): df.join(date_spine, how='cross') # every row × every date
# PIVOT — rows to columns (cross-tab) df.pivot( on = 'status', # column whose values become headers index = 'user_id', # row key values = 'amount', # value to aggregate aggregate_function = 'sum', # 'sum'|'mean'|'count'|'min'|'max'|'first' sort_columns = True, ) # UNPIVOT — columns to rows (melt / wide to long) df.unpivot( on = ['q1_revenue', 'q2_revenue', 'q3_revenue', 'q4_revenue'], index = ['product_id', 'region'], variable_name = 'quarter', value_name = 'revenue', ) # CONCAT — vertical (row) stacking combined = pl.concat([df_jan, df_feb, df_mar], how='diagonal_relaxed' # handles mismatched schemas! ) # how options: # 'vertical' — same schema required # 'diagonal' — fill missing columns with null # 'diagonal_relaxed' — cast types where possible + fill null # HSTACK — column (horizontal) stacking df_combined = pl.concat([df_features, df_labels], how='horizontal') # UPDATE — pandas-style combine_first, left-wins merge: df.update( new_data, on='user_id', # match key how='left', # keep all left rows include_nulls=False # don't overwrite with nulls )
Polars reads raw data from all ingress vectors and transforms it in Python with zero-copy Arrow. DuckDB runs SQL analytical queries on those LazyFrames directly, with no serialization. dbt orchestrates the pipeline, defines the schema contracts in YAML, and manages the DAG. They share the same Apache Arrow memory format — data flows between all three without a single copy.
import polars as pl import duckdb """ Ingress pattern: [Kafka / S3 / Postgres / HTTP] ↓ Polars (scan + transform) ↓ Arrow (zero-copy) ↓ DuckDB (SQL aggregation) ↓ Parquet (write back to lake) ↓ dbt (orchestrate, test, document) """ # Step 1: Polars reads + transforms raw ingress events_lf = ( pl.scan_ndjson( 's3://kafka-sink/topic=events/**/*.json', storage_options={'region': 'us-east-1'} ) .with_columns([ pl.col('event_ts').str.to_datetime('%Y-%m-%dT%H:%M:%S%.fZ'), pl.col('user_id').cast(pl.Int64), pl.col('amount_cents').cast(pl.Int64) / 100, pl.col('event_type').cast(pl.Categorical), ]) .filter(pl.col('event_id').is_not_null()) # dedup: keep latest per event_id .sort('_ingested_at', descending=True) .unique(subset=['event_id'], keep='first', maintain_order=True) ) # Step 2: DuckDB runs complex SQL on the Polars LazyFrame # (zero-copy — DuckDB reads Polars' Arrow buffers directly) con = duckdb.connect() result = con.sql(""" SELECT date_trunc('hour', event_ts) AS hour, event_type, count(*) AS event_count, sum(amount_cents) AS revenue, count(DISTINCT user_id) AS unique_users, percentile_cont(0.95) WITHIN GROUP (ORDER BY amount_cents) AS p95_amount FROM events_lf GROUP BY ALL ORDER BY hour DESC, revenue DESC """).pl() # result is a Polars DataFrame # Step 3: Write back to lake as partitioned Parquet # (dbt can then reference this as a source) ( result.lazy() .with_columns([ pl.col('hour').dt.year().alias('year'), pl.col('hour').dt.month().alias('month'), ]) .sink_parquet( pl.PartitionMaxSizeBytes( 's3://lake/hourly_metrics/', max_size=128*1024**2 ) ) )
Representative benchmarks. Both Polars & DuckDB are in the same tier — pick by interface preference (Python vs SQL).
| Package | Role |
|---|---|
| polars[all] | All optional extras: Arrow, XLSX, cloud, etc. |
| polars-cloud | Run Polars queries on distributed cloud infra |
| pyo3-polars | Write Rust plugin expressions (UDFs at native speed) |
| narwhals | Write pandas/polars-agnostic library code |
| ibis-polars | Ibis query API compiled to Polars |
| patito | Pydantic-style data validation for DataFrames |
| polars-ds | DataScience extensions: LSH, KNN, window features |
| hvplot / altair | Polars-native plotting |
| great-tables | Beautiful formatted tables from Polars DataFrames |
| dbt-polars | Use Polars as dbt model execution engine |
// Cargo.toml: pyo3-polars = { version = "0.x", features = ["derive"] } // This UDF runs INSIDE the Polars query optimizer — zero Python overhead. use polars::prelude::*; use pyo3_polars::derive::polars_expr; use serde::Deserialize; // Define keyword arguments your Python users can pass: #[derive(Deserialize)] struct MaskEmailKwargs { mask_char: String, keep_domain: bool, } // The expression plugin — input Series → output Series #[polars_expr(output_type = Utf8)] fn mask_email(inputs: &[Series], kwargs: MaskEmailKwargs) -> PolarsResult<Series> { let ca: &StringChunked = inputs[0].str()?; let mask = kwargs.mask_char.chars().next().unwrap_or('*'); let out: StringChunked = ca.apply(|opt_val| { opt_val.map(|email| { let parts: Vec<&str> = email.splitn(2, '@').collect(); if parts.len() == 2 { let local = &parts[0][..parts[0].len().min(2)]; let masked = mask.to_string().repeat(4); if kwargs.keep_domain { format!("{}{}@{}", local, masked, parts[1]) } else { format!("{}{}@[redacted]", local, masked) } } else { "[invalid]".to_string() } }) }); Ok(out.into_series()) } // Python usage after maturin build: // from my_polars_plugin import mask_email // df.with_columns( // mask_email(pl.col('email'), mask_char='*', keep_domain=True) // .alias('email_masked') // )