PL
POLARS
Rust DataFrame Engine
v1.x STABLE
THE THIRD PILLAR
Polars is a Rust-native DataFrame library with a Python API. It is not a faster pandas — it is a different paradigm. Lazy evaluation, expression trees, query optimization, zero-copy Apache Arrow, and a type system that prevents entire categories of bugs at construction time.
Engine
Rust + SIMD
Memory Model
Apache Arrow
Evaluation
Lazy / Eager
Null Handling
Explicit — no NaN
0
GIL Dependency
Arrow
Memory Format
Lazy
Default Eval Mode
SIMD
CPU Vectorization
Streaming Support
Null
Not NaN
pandas → Polars: the paradigm shift
df['col'].fillna(0)
pl.col('col').fill_null(0)
df['col'].apply(fn)
pl.col('col').map_elements(fn)
df[df['x'] > 5]
.filter(pl.col('x') > 5)
df.groupby('k').agg({'v': 'sum'})
.group_by('k').agg(pl.col('v').sum())
df.merge(other, on='id')
.join(other, on='id', how='left')
df['new'] = df['a'] + df['b']
.with_columns((pl.col('a') + pl.col('b')).alias('new'))
df.sort_values('col')
.sort('col', descending=False)
pd.read_csv() → eager
pl.scan_csv() → lazy LazyFrame
np.nan for missing
null — explicit, typed
Getting Started — Zero to 100
install + first queriespython
import polars as pl

# The two modes — choose your weapon
df  = pl.read_parquet('events.parquet')    # eager: loads now
lf  = pl.scan_parquet('events.parquet')    # lazy: builds a plan

# Core pattern: build expression, call collect()
result = (
    pl.scan_parquet('s3://lake/events/**/*.parquet')
    .filter(pl.col('event_type') == 'purchase')
    .filter(pl.col('event_ts') >= pl.lit('2024-01-01').str.to_datetime())
    .with_columns(
        (pl.col('amount_cents') / 100).alias('amount'),
        pl.col('event_ts').dt.truncate('1d').alias('day')
    )
    .group_by(['day', 'user_id'])
    .agg(
        pl.len().alias('purchase_count'),
        pl.col('amount').sum().alias('total_spend'),
        pl.col('amount').mean().alias('avg_order')
    )
    .sort('total_spend', descending=True)
    .collect()   # ← triggers optimized execution
)

# Instant profiling — like DuckDB SUMMARIZE:
print(df.describe())   # count, mean, std, min, max per column
print(df.schema)       # full type information
print(df.estimated_size('mb'))  # memory footprint

Why Rust? The Technical Reality

True Parallelism

Python's Global Interpreter Lock prevents true multi-threading. Polars' Rust core releases the GIL immediately. Every operation — sort, group_by, join — uses all CPU cores via Rayon's work-stealing thread pool. A 16-core machine runs 16× faster, not 1×.

Zero-Copy Arrow

Polars stores data in Apache Arrow columnar buffers. When you pass a Polars DataFrame to DuckDB or PyArrow, no data is copied. The same memory is read by all three engines. The full dbt + DuckDB + Polars stack moves data between tools in nanoseconds.

Type System Catches Bugs

Polars' schema is fixed at LazyFrame construction. If you .filter(pl.col('amt') > 'oops') — comparing numeric to string — the error fires at query build time, not hours into a production run. Entire categories of runtime bugs become compile-time errors.

THE LAZY API
LazyFrame is the heart of Polars. You describe what you want — not how to compute it. Polars' query optimizer then rewrites, reorders, and parallelizes your plan before executing a single byte of data access.
Evaluation
Deferred
Optimizer
Predicate / Projection
Trigger
.collect()

The Lazy Execution Pipeline

Every .scan_*() call returns a LazyFrame. Chained operations build a logical plan. .collect() runs the optimizer, produces a physical plan, and executes it.

1
pl.scan_parquet('events/**/*.parquet')
→ LazyFrame. Zero data read. Registers scan source.
2
.filter(pl.col('ts') >= '2024-01-01')
→ Adds predicate node to plan. Optimizer will push this to scan.
3
.select(['user_id', 'amount', 'ts'])
→ Projection pushdown: only these 3 columns read from disk.
4
.group_by('user_id').agg(pl.col('amount').sum())
→ Aggregation node. Optimizer may split for streaming.
5
.collect() → DataFrame
→ Triggers: optimize → physical plan → parallel execution → Arrow buffer.
The Optimizer Rewrites Your Code You write filters after joins — the optimizer moves them before. You select 3 columns from a 50-column file — the optimizer pushes projection to the file reader. You chain two filters — they're merged into one pass. You never see this happening, but it's why Polars is often 5–50× faster than equivalent pandas code.

Query Plan Inspection

inspect + optimize + streampython
import polars as pl

lf = (
    pl.scan_parquet('events.parquet')
    .filter(pl.col('event_type') == 'purchase')
    .select(['user_id', 'amount', 'event_ts'])
    .group_by('user_id')
    .agg(pl.col('amount').sum())
)

# See the LOGICAL plan (your intent):
print(lf.explain(optimized=False))

# See the OPTIMIZED plan (what actually runs):
print(lf.explain(optimized=True))
# Notice: filter & select are pushed INTO the ParquetScan.

# Visualize as a tree (requires graphviz):
lf.show_graph(optimized=True)

# ─── STREAMING — larger-than-RAM execution ───────────
# collect() loads everything into RAM.
# collect(engine='streaming') processes in chunks.

result = (
    pl.scan_parquet('s3://lake/events/**/*.parquet')
    .filter(pl.col('amount') > 0)
    .group_by('user_id')
    .agg(pl.col('amount').sum())
    .collect(engine='streaming')  # process 500MB at a time
)

# sink_parquet — stream directly to disk, never loads to RAM:
(
    pl.scan_parquet('huge_100gb_file.parquet')
    .filter(pl.col('status') == 'active')
    .sink_parquet('output/active_users.parquet')
    # executes streaming — never materializes full DataFrame
)

# sink_csv, sink_ipc, sink_ndjson also available
(
    pl.scan_parquet('events.parquet')
    .sink_ipc('events.arrow')    # Arrow IPC stream format
)

Logical Plan Nodes

SC
ParquetScan / CsvScan / IpcScan
source + pushdown
↓ (predicate + projection pushed here)
FL
Filter
Boolean mask predicate
SL
Select / WithColumns
Expression evaluation
JN
Join
Hash / sort-merge / cross
AG
GroupBy / Aggregate
Parallel hash aggregation
SO
Sort
Parallel sort or top-k
Collect / Sink
DataFrame or streaming output

Scan Options — The Ingress Interface

scan_* — full parameter referencepython
# scan_parquet — the workhorse
pl.scan_parquet(
    source              = 's3://bucket/data/**/*.parquet',
    n_rows              = 1_000_000,    # early stop (fast LIMIT)
    row_index_name      = '_row_idx',    # inject row numbers
    row_index_offset    = 0,
    parallel            = 'auto',        # 'columns' | 'row_groups' | 'prefiltered'
    use_statistics      = True,          # row group zone map pushdown
    hive_partitioning   = True,          # inject partition columns from path
    hive_schema         = {'year': pl.Int32, 'month': pl.Int32},
    include_file_paths  = '_source_file', # track which file each row came from
    storage_options     = {'aws_access_key_id': '...', 'region': 'us-east-1'}
)

# scan_csv — streaming CSV with full control
pl.scan_csv(
    source          = 'data/*.csv',
    separator       = ',',
    has_header      = True,
    skip_rows       = 0,
    dtypes          = {'id': pl.Int64, 'amount': pl.Float64},
    null_values     = ['', 'N/A', 'null'],
    try_parse_dates = True,             # auto-detect date columns
    ignore_errors   = True,             # skip malformed rows
    encoding        = 'utf8-lossy',     # handle dirty encodings
    comment_prefix  = '#'               # skip comment lines
)

# scan_ndjson — Kafka S3 sink output
pl.scan_ndjson(
    source          = 's3://kafka-sink/topic=events/**/*.json',
    infer_schema_length = 1000,        # sample N rows for schema inference
    schema          = pl.Schema({      # or provide explicit schema
        'event_id':   pl.String,
        'user_id':    pl.Int64,
        'event_ts':   pl.Datetime('us'),
        'payload':    pl.String,       # keep JSON as string for later parsing
    })
)
EXPRESSIONS — THE ALGEBRA
Polars expressions are composable, lazy computation units. They describe transformations on columns without executing them. The optimizer analyzes expression trees to determine parallelism, eliminate dead computation, and push work down to the I/O layer.
Type
pl.Expr
Entry Point
pl.col()
Compose
Chain methods
pl.col().str — string namespacepython
# The .str namespace wraps every string operation
df.with_columns([

    pl.col('email')
      .str.to_lowercase()
      .str.strip_chars()
      .str.split('@')
      .list.first()             # → local part of email
      .alias('email_local'),

    pl.col('phone')
      .str.replace_all(r'[^\d]', '')  # strip non-digits (regex)
      .str.zfill(10)                  # zero-pad to 10 chars
      .alias('phone_clean'),

    pl.col('payload_str')
      .str.json_path_match('$.user.id')    # JSONPath on string column
      .cast(pl.Int64)
      .alias('user_id_extracted'),

    pl.col('event_type')
      .str.contains('purchase')           # boolean mask
      .alias('is_purchase'),

    pl.col('url')
      .str.extract(r'utm_source=([^&]+)', group_index=1)
      .alias('utm_source'),

    pl.col('tags_csv')
      .str.split(',')                     # → List[String]
      .list.eval(pl.element().str.strip_chars())
      .alias('tags'),

    pl.col('name').str.count_matches(r'\s+').alias('word_count'),
    pl.col('text').str.len_chars().alias('char_count'),
    pl.col('code').str.starts_with('ORD-').alias('is_order'),
])
Regex is Rust Regex — Fast All .str.contains(), .str.extract(), and .str.replace_all() use the Rust regex crate. It's compiled once, executed across all CPU cores on vectorized string data. Regex processing in Polars is orders of magnitude faster than Python's re module in a loop.
pl.col().dt — datetime namespacepython
# Parse, truncate, extract, convert — all vectorized
df.with_columns([

    # Parse strings → Datetime
    pl.col('ts_str')
      .str.to_datetime('%Y-%m-%dT%H:%M:%S%.f%z')
      .dt.convert_time_zone('UTC')
      .alias('ts_utc'),

    # Truncate to period (the Polars date_trunc)
    pl.col('ts_utc').dt.truncate('1h').alias('hour_bucket'),
    pl.col('ts_utc').dt.truncate('1d').alias('day'),
    pl.col('ts_utc').dt.truncate('1mo').alias('month'),

    # Extract components
    pl.col('ts_utc').dt.year().alias('year'),
    pl.col('ts_utc').dt.month().alias('month_num'),
    pl.col('ts_utc').dt.weekday().alias('weekday'),   # 1=Mon
    pl.col('ts_utc').dt.hour().alias('hour'),
    pl.col('ts_utc').dt.ordinal_day().alias('doy'),

    # Duration arithmetic
    (pl.col('end_ts') - pl.col('start_ts'))
      .dt.total_seconds()
      .alias('session_seconds'),

    # Epoch conversions
    pl.col('epoch_ms')
      .cast(pl.Datetime('ms'))
      .dt.convert_time_zone('America/New_York')
      .alias('ts_local'),

    # Rolling periods: "is this weekend?"
    pl.col('ts_utc').dt.weekday().is_in([6,7]).alias('is_weekend'),

    # Offset by duration
    (pl.col('ts_utc') + pl.duration(days=30)).alias('ts_plus_30d'),
])
Timezone Handling — No Surprises Polars distinguishes Datetime('us') (timezone-naive, microseconds) from Datetime('us', 'UTC') (timezone-aware). Operations between naive and aware datetimes raise errors at plan time — before your pipeline runs. pandas silently mixes them and produces garbage.
List column operationspython
# Polars has first-class List and Struct types.
# These unlock JSON-like nested data without exploding rows.

df.with_columns([

    # List stats without unnesting
    pl.col('line_items').list.len().alias('item_count'),
    pl.col('prices').list.sum().alias('total'),
    pl.col('prices').list.mean().alias('avg_price'),
    pl.col('prices').list.max().alias('max_price'),
    pl.col('tags').list.first().alias('primary_tag'),
    pl.col('tags').list.last().alias('last_tag'),

    # Filter list elements with lambda
    pl.col('prices')
      .list.eval(pl.element().filter(pl.element() > 100))
      .alias('expensive_items'),

    # Sort / unique list elements
    pl.col('tags').list.sort().list.unique().alias('sorted_unique_tags'),

    # Contains check
    pl.col('tags').list.contains('premium').alias('is_premium'),

    # Gather by index
    pl.col('prices').list.gather([0, -1]).alias('first_and_last'),
])

# EXPLODE list column to rows (1-to-many):
df.explode('line_items')

# list.eval — run ANY expression on each list element:
# This is Polars' equivalent of pandas apply() but vectorized.
df.with_columns(
    pl.col('amounts')
      .list.eval(
          (pl.element() - pl.element().mean()) / pl.element().std()
      )
      .alias('amounts_z_score')
)
Struct — typed nested objectspython
# Struct: a column where each value is a named record.
# Perfect for JSON payloads and nested API responses.

# Parse JSON string → Struct
df.with_columns(
    pl.col('json_payload')
      .str.json_decode()           # → Struct column
      .alias('payload_struct')
)

# Access struct fields with dot notation:
df.with_columns([
    pl.col('payload_struct').struct.field('user_id').alias('user_id'),
    pl.col('payload_struct').struct.field('email').alias('email'),
])

# Unnest struct → individual columns:
df.unnest('payload_struct')

# Build a struct column from multiple columns:
df.select(
    pl.struct([
        pl.col('first_name'),
        pl.col('last_name'),
        pl.col('age'),
    ]).alias('person')
)

# Rename struct fields:
pl.col('payload_struct').struct.rename_fields([
    'user_id', 'email', 'session_id'
])

# Struct in group_by — keep related fields together:
df.group_by('user_id').agg(
    pl.struct([
        pl.col('event_type').first(),
        pl.col('event_ts').first(),
        pl.col('amount').sum(),
    ]).alias('first_event_summary')
)
Window expressions — over()python
# Window expressions in Polars use .over() instead of SQL's OVER PARTITION BY.
# They compute GROUP-level aggregations and broadcast back to original rows —
# without a join step. This is the Polars secret weapon for per-group metrics.

df.with_columns([

    # Running total per user — keeps all rows
    pl.col('amount')
      .cum_sum()
      .over('user_id')
      .alias('running_total_per_user'),

    # Rank within group
    pl.col('amount')
      .rank(method='dense', descending=True)
      .over('user_id')
      .alias('spend_rank'),

    # Group size (no join needed)
    pl.len().over('user_id').alias('user_event_count'),

    # Group mean (broadcast back to all rows)
    pl.col('amount').mean().over('user_id').alias('user_avg_spend'),

    # Z-score per user (subtract group mean, divide by group std)
    (
        (pl.col('amount') - pl.col('amount').mean().over('user_id'))
        / pl.col('amount').std().over('user_id')
    ).alias('amount_zscore'),

    # First/last event per user
    pl.col('event_ts').min().over('user_id').alias('first_seen'),
    pl.col('event_ts').max().over('user_id').alias('last_seen'),

    # Previous row value per group (lag)
    pl.col('amount')
      .shift(1)
      .over('user_id', order_by='event_ts')
      .alias('prev_amount'),

    # Rolling window per group (7-day rolling sum)
    pl.col('amount')
      .rolling_sum_by('event_ts', window_size='7d')
      .over('user_id')
      .alias('rolling_7d_spend'),
])
over() — Massively More Efficient Than SQL Window Functions in pandas In pandas, window-per-group requires .groupby().transform() — which has shocking overhead. In Polars, .over('user_id') computes the aggregate in a single parallel pass, stores it as a hash table, and broadcasts values back in a second pass. No merge(), no intermediate DataFrame, no memory duplication.
when/then/otherwise — vectorized CASE WHENpython
# pl.when().then().otherwise() is Polars' CASE WHEN.
# Unlike Python if/else, this evaluates ALL branches in parallel,
# then selects results with a bitmask — zero branch prediction cost.

df.with_columns([

    # Simple binary
    pl.when(pl.col('status') == 'active')
      .then(pl.lit(1))
      .otherwise(pl.lit(0))
      .alias('is_active'),

    # Multi-branch (chain .when().then())
    pl.when(pl.col('amount') < 10)
      .then(pl.lit('micro'))
      .when(pl.col('amount') < 100)
      .then(pl.lit('small'))
      .when(pl.col('amount') < 1000)
      .then(pl.lit('medium'))
      .otherwise(pl.lit('large'))
      .alias('order_tier'),

    # Reference another column in the result
    pl.when(pl.col('is_refund'))
      .then(pl.col('amount') * -1)
      .otherwise(pl.col('amount'))
      .alias('signed_amount'),

    # Null handling in conditionals
    pl.when(pl.col('discount_code').is_null())
      .then(pl.lit('NONE'))
      .otherwise(pl.col('discount_code').str.to_uppercase())
      .alias('discount_normalized'),

    # Complex predicate with multiple conditions
    pl.when(
        (pl.col('amount') > 100) &
        (pl.col('status') == 'completed') &
        pl.col('user_id').is_not_null()
    )
    .then(pl.col('amount') * 0.05)  # 5% cashback
    .otherwise(pl.lit(0.0))
    .alias('cashback'),
])
map_elements + map_batchespython
# map_elements: Python function per element (slow — avoid!)
# Use only when no native Polars expression exists.
df.with_columns(
    pl.col('text')
      .map_elements(
          lambda s: my_nlp_model.predict(s),
          return_dtype=pl.Float32
      )
      .alias('sentiment_score')
)

# map_batches: Python function on a SERIES (vectorized).
# Much faster — operates on Rust Series object.
import numpy as np

df.with_columns(
    pl.col('amount')
      .map_batches(
          lambda s: pl.Series(np.log1p(s.to_numpy())),
          return_dtype=pl.Float64
      )
      .alias('log_amount')
)

# Plugin expressions (Rust UDFs via pyo3-polars):
# Write UDFs in Rust that execute inside the optimizer's plan.
# Zero Python overhead — runs at native Rust speed.
# → cargo new polars-plugin && implement GrokExpression trait
pl.all(), pl.exclude(), pl.col(dtype)python
# pl.all() — selects every column
df.select(pl.all().sort_by('event_ts'))

# pl.exclude() — all columns except named ones
df.select(pl.exclude('_loaded_at', '_fivetran_deleted', 'ssn'))

# pl.col(dtype) — select columns by data type
df.select(pl.col(pl.Float64))          # all numeric float64 cols
df.select(pl.col(pl.Utf8))             # all string cols
df.select(pl.col(pl.Datetime))         # all datetime cols
df.select(pl.col(pl.List))             # all list cols

# Apply operation to ALL numeric columns at once:
df.with_columns(
    pl.col(pl.Float64).round(2)       # round all floats
)

# Normalize (z-score) ALL numeric columns in one expression:
df.with_columns([
    ((pl.col(c) - pl.col(c).mean()) / pl.col(c).std()).alias(c)
    for c in df.select(pl.col(pl.Float64)).columns
])

# cs (column selectors) — the powerful selection DSL:
import polars.selectors as cs

df.select(cs.numeric())                # all numeric types
df.select(cs.string())                 # all string types
df.select(cs.temporal())               # all date/datetime/duration
df.select(cs.starts_with('event_'))   # column name prefix
df.select(cs.ends_with('_at'))        # column name suffix
df.select(cs.contains('amount'))      # column name substring
df.select(cs.numeric() - cs.by_name('id'))   # set subtraction
df.select(cs.numeric() | cs.temporal())        # set union
df.select(~cs.numeric())                        # complement (everything else)
Column Selectors (cs) — The DuckDB COLUMNS() of Polars polars.selectors is a first-class selection algebra. You can compose selectors with set operations: cs.numeric() - cs.by_name('id') means "all numeric columns except id." Combined with .with_columns(), this lets you apply transformations to dynamically-selected column sets — perfect for schema-agnostic pipeline code.
Dynamic schema-agnostic transformspython
import polars as pl
import polars.selectors as cs

def normalize_pipeline(lf: pl.LazyFrame) -> pl.LazyFrame:
    """Works on any schema — no column names hardcoded."""
    return (
        lf
        .with_columns(
            # Lowercase all strings
            cs.string().str.to_lowercase(),
            # Round all floats
            cs.float().round(4),
            # UTC-normalize all datetimes
            cs.temporal().dt.convert_time_zone('UTC'),
        )
        .with_columns(
            # Fill nulls: 0 for numeric, 'unknown' for string
            cs.numeric().fill_null(0),
            cs.string().fill_null('unknown'),
        )
    )
THE TYPE SYSTEM
Polars' type system is its backbone. Every column has a fixed, known type. Mismatched operations fail at plan construction, not at runtime. Choosing the right dtype cuts memory usage by 4–8× and doubles CPU throughput by enabling SIMD.
Null Model
Validity bitmask
No NaN
Float NaN ≠ null
String
Categorical option

Complete Dtype Reference

Int8-128 to 127 · 1 byte
Int16-32768 to 32767 · 2 bytes
Int32-2.1B to 2.1B · 4 bytes
Int64±9.2×10¹⁸ · 8 bytes (default)
UInt80–255 · 1 byte
UInt160–65535 · 2 bytes
UInt320–4.2B · 4 bytes
UInt640–1.8×10¹⁹ · 8 bytes
Float32~7 sig digits · 4 bytes
Float64~15 sig digits · 8 bytes
StringUTF-8 · variable
CategoricalString → UInt32 dict
EnumFixed Categorical · fastest
BooleanBitmask · 1 bit/value
Datedays since epoch · 4 bytes
Datetime(tu, tz)ns/us/ms + optional tz
Timenanoseconds since midnight
Durationsigned time delta
List(inner)variable-length nested
Array(inner, n)fixed-length nested
Struct([fields])named field record
Decimal(p, s)exact decimal arithmetic
Binaryraw bytes
Nullall-null placeholder
Objectarbitrary Python (escape hatch)
Unknownpre-collect schema

Categorical — The String Memory Killer

String vs Categorical — The 8× Memory Difference A column with 100M rows of status values like "active", "cancelled", "pending" stores 700MB as String. As Categorical, it stores a 3-entry dictionary and 100M UInt32 indices — about 400MB. As Enum (fixed categories), it stores UInt8 indices — 100MB. Same data, 7× less memory, 3× faster group_by.
Categorical + Enum patternspython
# Cast to Categorical at read time — saves memory from the start
lf = pl.scan_parquet('events.parquet').with_columns(
    pl.col('event_type').cast(pl.Categorical),
    pl.col('status').cast(pl.Categorical),
    pl.col('country').cast(pl.Categorical),
)

# Enum — when you KNOW the valid values (best performance):
OrderStatus = pl.Enum(['pending', 'processing', 'shipped',
                        'delivered', 'cancelled', 'refunded'])
df.with_columns(
    pl.col('status').cast(OrderStatus)
)
# Invalid value in 'status'? Error at cast time, not query time.

# Global Categorical — align dictionaries across DataFrames for joins:
with pl.StringCache():
    df1 = pl.read_parquet('jan.parquet').with_columns(
        pl.col('user_type').cast(pl.Categorical)
    )
    df2 = pl.read_parquet('feb.parquet').with_columns(
        pl.col('user_type').cast(pl.Categorical)
    )
    joined = df1.join(df2, on='user_type')  # dictionaries aligned

Schema Enforcement Pattern

Schema-first ingress pipelinepython
import polars as pl
from polars import Schema

# Define your schema once — use it everywhere
EVENTS_SCHEMA = Schema({
    'event_id':    pl.String,
    'user_id':     pl.Int64,
    'event_type':  pl.Enum(['purchase', 'view', 'click', 'refund']),
    'event_ts':    pl.Datetime('us', 'UTC'),
    'amount':      pl.Decimal(precision=18, scale=4),
    'session_id':  pl.String,
    'platform':    pl.Categorical,
    'country':     pl.Categorical,
    'metadata':    pl.Struct({
                       'ip': pl.String,
                       'ua': pl.String,
                   }),
})

def read_events(path: str) -> pl.LazyFrame:
    return pl.scan_parquet(
        path,
        schema=EVENTS_SCHEMA  # enforced at read time
    )

# Schema mismatch → SchemaError before data is read.
# Cast to schema after reading unknown sources:
def enforce_schema(lf: pl.LazyFrame) -> pl.LazyFrame:
    return lf.cast(EVENTS_SCHEMA, strict=False)
    # strict=False: coerce where possible, null where impossible
    # strict=True (default): raise on any type mismatch
I/O — READ AND WRITE EVERYTHING
Polars reads and writes all major data formats natively. The Rust-native I/O stack is dramatically faster than pandas' equivalent — especially for Parquet, where Polars uses a custom reader with parallel row-group decompression.
Parquet Engine
Custom Rust
Cloud
S3/GCS/AZ native
DuckDB Bridge
Zero-copy Arrow

Write Formats — Full Arsenal

write_* — output optionspython
# Parquet — production output format
df.write_parquet(
    'output.parquet',
    compression         = 'zstd',          # best ratio + speed
    compression_level   = 3,               # 1-22, default 3
    statistics          = True,            # zone map metadata
    row_group_size      = 122_880,         # rows per group
    use_pyarrow         = False,           # use native Rust writer
)

# Partitioned Parquet (via sink — streaming)
(
    df.lazy()
    .sink_parquet(
        pl.PartitionMaxSizeBytes('output_dir/', max_size=256*1024**2)
    )
)

# Delta Lake — write directly from Polars
from deltalake.writer import write_deltalake
write_deltalake('s3://bucket/delta/events', df.to_arrow(),
                mode='append', partition_by=['year', 'month'])

# CSV, JSON, Arrow IPC, NDJSON
df.write_csv('output.csv', separator='|', date_format='%Y-%m-%d')
df.write_ndjson('output.ndjson')
df.write_ipc('output.arrow', compression='lz4')   # Arrow IPC
df.write_ipc_stream('stream.arrows')              # Arrow streaming IPC
df.write_avro('output.avro')                      # Apache Avro
df.write_excel('report.xlsx', worksheet='Data')  # Excel
df.write_database(
    table_name='events',
    connection='postgresql://user:pw@host/db',
    if_table_exists='append'                      # 'replace' | 'fail'
)

Cloud + Arrow Bridge

Cloud storage + DuckDB interchangepython
# Cloud storage credentials via environment or explicit config:
import polars as pl

storage_options = {
    'aws_access_key_id':     '...',
    'aws_secret_access_key': '...',
    'aws_region':            'us-east-1',
}

# Read from S3 (lazy)
lf = pl.scan_parquet(
    's3://bucket/events/**/*.parquet',
    storage_options=storage_options
)

# Write to GCS
df.write_parquet(
    'gs://bucket/output.parquet',
    storage_options={'service_account': 'key.json'}
)

─────────────────────────────────────────────────
# Arrow bridge — zero-copy interchange ecosystem:

# Polars → PyArrow (zero copy)
arrow_table = df.to_arrow()

# PyArrow → Polars (zero copy)
df = pl.from_arrow(arrow_table)

# Polars → pandas (one copy, Arrow → pandas)
pandas_df = df.to_pandas()
# Use use_pyarrow_extension_array=True for Arrow-backed pandas:
pandas_df = df.to_pandas(use_pyarrow_extension_array=True)

# pandas → Polars
df = pl.from_pandas(pandas_df)

# Polars ↔ DuckDB (zero copy via Arrow):
import duckdb

# DuckDB queries a Polars LazyFrame directly:
lf = pl.scan_parquet('events.parquet')
result = duckdb.sql("SELECT * FROM lf WHERE amount > 100").pl()

# Polars queries a DuckDB relation directly:
con = duckdb.connect('analytics.duckdb')
arrow_result = con.execute("SELECT * FROM events").arrow()
df = pl.from_arrow(arrow_result)
GROUP BY MASTERY
Polars group_by is a parallel hash aggregation engine. Unlike SQL, the aggregation expressions are full Polars expressions — you can run complex multi-step computations inside a single agg() call.
Algorithm
Parallel hash agg
Order
Non-deterministic
Stable
group_by_stable()
group_by — the full expression surfacepython
# Every agg() argument is a full Polars expression
df.group_by(['user_id', 'event_type']).agg([

    # Basic aggregations
    pl.len().alias('count'),
    pl.col('amount').sum().alias('total'),
    pl.col('amount').mean().alias('avg'),
    pl.col('amount').std().alias('std'),
    pl.col('amount').min().alias('min'),
    pl.col('amount').max().alias('max'),
    pl.col('amount').median().alias('p50'),

    # Quantiles
    pl.col('amount').quantile(0.95).alias('p95'),
    pl.col('amount').quantile(0.99).alias('p99'),

    # First / last (ordered)
    pl.col('event_ts').min().alias('first_event'),
    pl.col('event_ts').max().alias('last_event'),
    pl.col('session_id').first().alias('first_session'),

    # Count distinct
    pl.col('session_id').n_unique().alias('unique_sessions'),

    # Collect into a list
    pl.col('amount').sort_by(pl.col('event_ts')).alias('amounts_ordered'),

    # Conditional count inside agg
    (pl.col('amount') > 100).sum().alias('large_orders'),

    # Complex expression inside agg
    (pl.col('amount') * pl.col('quantity')).sum().alias('revenue'),

    # Filter inside agg (conditional aggregation)
    pl.col('amount').filter(pl.col('status') == 'completed')
      .sum().alias('completed_revenue'),

    # Mode (most frequent value)
    pl.col('platform').mode().first().alias('primary_platform'),

    # String concatenation
    pl.col('event_type').unique().sort().str.join(',').alias('event_types'),
])

Specialized Group By Variants

group_by_dynamic + rollingpython
# group_by_dynamic — time-window aggregations
# Perfect for: metrics over time, moving aggregates

df.group_by_dynamic(
    index_column  = 'event_ts',
    every         = '1h',          # window size
    period        = '1h',          # = every: no overlap
    offset        = '0h',
    include_boundaries = True,    # add _lower_boundary, _upper_boundary
    closed        = 'left',        # interval semantics
    group_by      = ['user_id'],   # optional: partition first
    start_by      = 'datapoint',   # 'window' | 'datapoint' | 'monday'
).agg([
    pl.col('amount').sum().alias('hourly_spend'),
    pl.len().alias('event_count'),
])

─────────────────────────────────────────────────
# Rolling aggregations (per-row sliding window)

df.with_columns(
    pl.col('amount')
      .rolling_mean_by('event_ts', window_size='7d')
      .alias('rolling_7d_avg'),

    pl.col('amount')
      .rolling_sum_by('event_ts', window_size='30d')
      .alias('rolling_30d_sum'),

    pl.col('amount')
      .rolling_max_by('event_ts', window_size='24h')
      .alias('rolling_24h_max'),
)

─────────────────────────────────────────────────
# partition_by — split DataFrame into dict of DataFrames
partitions = df.partition_by('year', 'month', as_dict=True)
for (year, month), chunk in partitions.items():
    chunk.write_parquet(f'output/year={year}/month={month}/data.parquet')
JOINS & RESHAPING
Polars implements multiple join algorithms and chooses the optimal one based on cardinality estimates. The API surface includes join types absent from pandas but essential for data pipeline work.
Default Algorithm
Hash join
Sort-merge
sort=True
Parallel
All cores
join() — full parameter surfacepython
# Standard join types:
df.join(other,
    on         = 'user_id',           # same name both sides
    left_on    = 'user_id',           # different names:
    right_on   = 'id',               #   use left_on/right_on
    how        = 'left',             # 'inner'|'left'|'right'|'full'|'semi'|'anti'|'cross'
    suffix     = '_right',           # collision suffix
    coalesce   = True,               # coalesce join keys
    validate   = 'm:1',             # '1:1'|'1:m'|'m:1'|'m:m' — data contract!
)

# validate= is the hidden gem. Pass '1:1' and Polars
# raises an error if the right table has duplicates on
# the join key — catch integrity bugs before they propagate.

# Semi join — filter left by existence in right (no duplication):
active_users = df.join(paid_users, on='user_id', how='semi')

# Anti join — filter left by NON-existence in right:
churned_users = df.join(active_users, on='user_id', how='anti')

# Join on expression (not just column name):
df.join_where(
    other,
    pl.col('event_ts') >= pl.col('valid_from'),
    pl.col('event_ts') <  pl.col('valid_to'),
    pl.col('user_id')  == pl.col('user_id'),
)
# join_where = the Polars equivalent of DuckDB's ASOF JOIN.
# Use it for: exchange rate lookups, SCD2 temporal joins,
# event-to-session matching, validity windows.

# Cross join (cartesian product):
df.join(date_spine, how='cross')  # every row × every date

Reshape — pivot, unpivot, concat

pivot / unpivot / concatpython
# PIVOT — rows to columns (cross-tab)
df.pivot(
    on         = 'status',          # column whose values become headers
    index      = 'user_id',         # row key
    values     = 'amount',          # value to aggregate
    aggregate_function = 'sum',    # 'sum'|'mean'|'count'|'min'|'max'|'first'
    sort_columns = True,
)

# UNPIVOT — columns to rows (melt / wide to long)
df.unpivot(
    on           = ['q1_revenue', 'q2_revenue', 'q3_revenue', 'q4_revenue'],
    index        = ['product_id', 'region'],
    variable_name = 'quarter',
    value_name   = 'revenue',
)

# CONCAT — vertical (row) stacking
combined = pl.concat([df_jan, df_feb, df_mar],
    how='diagonal_relaxed'  # handles mismatched schemas!
)
# how options:
# 'vertical'           — same schema required
# 'diagonal'           — fill missing columns with null
# 'diagonal_relaxed'   — cast types where possible + fill null

# HSTACK — column (horizontal) stacking
df_combined = pl.concat([df_features, df_labels], how='horizontal')

# UPDATE — pandas-style combine_first, left-wins merge:
df.update(
    new_data,
    on='user_id',       # match key
    how='left',          # keep all left rows
    include_nulls=False  # don't overwrite with nulls
)
THE ECOSYSTEM
Polars sits at the center of the modern Python data stack. It speaks Arrow to DuckDB, Parquet to the lakehouse, and Rust plugins to the performance ceiling. Here's how all three masterclass tools compose.
dbt adapter
dbt-polars
DuckDB bridge
Arrow (zero-copy)
Rust UDFs
pyo3-polars
THE HOLY TRINITY STACK — How All Three Connect Polars reads raw data from all ingress vectors and transforms it in Python with zero-copy Arrow. DuckDB runs SQL analytical queries on those LazyFrames directly, with no serialization. dbt orchestrates the pipeline, defines the schema contracts in YAML, and manages the DAG. They share the same Apache Arrow memory format — data flows between all three without a single copy.

The Full Stack — Production Pattern

tri-engine pipelinepython
import polars as pl
import duckdb

"""
Ingress pattern:
  [Kafka / S3 / Postgres / HTTP]
         ↓ Polars (scan + transform)
         ↓ Arrow (zero-copy)
         ↓ DuckDB (SQL aggregation)
         ↓ Parquet (write back to lake)
         ↓ dbt (orchestrate, test, document)
"""

# Step 1: Polars reads + transforms raw ingress
events_lf = (
    pl.scan_ndjson(
        's3://kafka-sink/topic=events/**/*.json',
        storage_options={'region': 'us-east-1'}
    )
    .with_columns([
        pl.col('event_ts').str.to_datetime('%Y-%m-%dT%H:%M:%S%.fZ'),
        pl.col('user_id').cast(pl.Int64),
        pl.col('amount_cents').cast(pl.Int64) / 100,
        pl.col('event_type').cast(pl.Categorical),
    ])
    .filter(pl.col('event_id').is_not_null())
    # dedup: keep latest per event_id
    .sort('_ingested_at', descending=True)
    .unique(subset=['event_id'], keep='first', maintain_order=True)
)

# Step 2: DuckDB runs complex SQL on the Polars LazyFrame
# (zero-copy — DuckDB reads Polars' Arrow buffers directly)
con = duckdb.connect()
result = con.sql("""
    SELECT
        date_trunc('hour', event_ts) AS hour,
        event_type,
        count(*)                     AS event_count,
        sum(amount_cents)            AS revenue,
        count(DISTINCT user_id)      AS unique_users,
        percentile_cont(0.95) WITHIN GROUP (ORDER BY amount_cents)
                                     AS p95_amount
    FROM events_lf
    GROUP BY ALL
    ORDER BY hour DESC, revenue DESC
""").pl()  # result is a Polars DataFrame

# Step 3: Write back to lake as partitioned Parquet
# (dbt can then reference this as a source)
(
    result.lazy()
    .with_columns([
        pl.col('hour').dt.year().alias('year'),
        pl.col('hour').dt.month().alias('month'),
    ])
    .sink_parquet(
        pl.PartitionMaxSizeBytes(
            's3://lake/hourly_metrics/',
            max_size=128*1024**2
        )
    )
)

Performance Comparison

Group By 100M rows (32-core machine)

Polars (lazy)
1.1s
DuckDB
1.8s
pandas
44s
Spark (local)
72s

Representative benchmarks. Both Polars & DuckDB are in the same tier — pick by interface preference (Python vs SQL).

Essential Ecosystem Packages

PackageRole
polars[all]All optional extras: Arrow, XLSX, cloud, etc.
polars-cloudRun Polars queries on distributed cloud infra
pyo3-polarsWrite Rust plugin expressions (UDFs at native speed)
narwhalsWrite pandas/polars-agnostic library code
ibis-polarsIbis query API compiled to Polars
patitoPydantic-style data validation for DataFrames
polars-dsDataScience extensions: LSH, KNN, window features
hvplot / altairPolars-native plotting
great-tablesBeautiful formatted tables from Polars DataFrames
dbt-polarsUse Polars as dbt model execution engine

Rust Plugin UDFs — The Performance Ceiling

pyo3-polars — native Rust expression pluginrust
// Cargo.toml: pyo3-polars = { version = "0.x", features = ["derive"] }
// This UDF runs INSIDE the Polars query optimizer — zero Python overhead.

use polars::prelude::*;
use pyo3_polars::derive::polars_expr;
use serde::Deserialize;

// Define keyword arguments your Python users can pass:
#[derive(Deserialize)]
struct MaskEmailKwargs {
    mask_char: String,
    keep_domain: bool,
}

// The expression plugin — input Series → output Series
#[polars_expr(output_type = Utf8)]
fn mask_email(inputs: &[Series], kwargs: MaskEmailKwargs) -> PolarsResult<Series> {
    let ca: &StringChunked = inputs[0].str()?;
    let mask = kwargs.mask_char.chars().next().unwrap_or('*');
    let out: StringChunked = ca.apply(|opt_val| {
        opt_val.map(|email| {
            let parts: Vec<&str> = email.splitn(2, '@').collect();
            if parts.len() == 2 {
                let local = &parts[0][..parts[0].len().min(2)];
                let masked = mask.to_string().repeat(4);
                if kwargs.keep_domain { format!("{}{}@{}", local, masked, parts[1]) }
                else { format!("{}{}@[redacted]", local, masked) }
            } else { "[invalid]".to_string() }
        })
    });
    Ok(out.into_series())
}

// Python usage after maturin build:
// from my_polars_plugin import mask_email
// df.with_columns(
//     mask_email(pl.col('email'), mask_char='*', keep_domain=True)
//     .alias('email_masked')
// )