∆ICE
Apache Iceberg
Production Field Guide — Spec v1 / v2 / v3
Format Spec · Not an Engine
Spark · Trino · Flink · Hive · DuckDB
Parquet · ORC · Avro · REST Catalog
// 00 — Before You Commit

Should You Adopt Iceberg?

Iceberg is a table format specification, not an engine or a database. It defines the layout of files on object storage and how multiple compute engines can safely read and write them. Adopting it is an infrastructure commitment with genuine operational weight — answer these questions honestly first.

Pre-Adoption Decision Framework

Adopt Iceberg if...
  • Multiple engines need the same tables (Spark + Trino + Flink)
  • You need ACID writes with snapshot isolation
  • You need time travel / reproducible queries
  • Schema evolution without ETL rewrites is critical
  • Tables exceed 1 TB and file pruning matters
  • CDC / row-level updates / deletes are required
  • Partition evolution without breaking existing queries
  • Regulatory need to audit or roll back table state
Maybe don't if...
  • Single-engine workload (Spark only → Delta might be simpler)
  • Purely streaming, no batch reads
  • Tables < 100 GB — plain Parquet is faster to operate
  • No dedicated data engineers to run compaction
  • Your cloud platform is already Delta-native (Databricks)
  • Existing Hive tables work and you have no pain
  • Append-only logs with no time travel need
Clarify before starting
  • Row-level updates / deletes — or append-only analytics?
  • Which engines need write access vs. read-only?
  • Do you need nanosecond timestamps or geospatial types? (V3)
  • Catalog: managed (Glue) or self-hosted (Nessie)?
  • Existing Hive tables to migrate or greenfield?
  • Who owns compaction scheduling?
  • Storage backend: S3 / GCS / ADLS?
  • SLA on stale metadata / snapshot expiry?

Storage Backend Comparison

BackendStrong Consistency?Atomic Rename?Cost ProfileNotes
AWS S3Yes (since 2020)No — use DynamoDB lock$0.023/GB + reqMost mature. Use S3 DynamoDB locking for catalog safety with Hive Metastore.
GCSYesYes (multi-obj)$0.020/GB + opsBest atomic semantics outside cloud. BigLake catalog integrates natively.
Azure ADLS Gen2YesYes (HNS)~$0.018/GB + txHierarchical namespace required for atomic directory ops. Enable HNS from the start.
MinIO (on-prem)YesPartialInfra costS3-compatible. Works well; needs proper replication setup for durability.

Key Decisions at a Glance

File Format
Parquet, ORC, or Avro?

Default: Parquet. Best columnar read performance for analytics. ORC if primarily Hive-native and you need transactional efficiency there. Avro for streaming write-heavy workloads (row-oriented, fast append) — rarely used for analytical queries. Iceberg V3 adds Variant type for semi-structured JSON embedded in Parquet.

Catalog
Hive / Glue / Nessie / REST?

Greenfield on AWS: Glue Data Catalog. Multi-cloud or Git-like branching: Nessie. Vendor-neutral / multi-engine: REST catalog (Polaris, Unity Catalog OSS). Existing Hadoop: Hive Metastore. Catalog choice determines multi-engine safety — never share a table across engines without a catalog enforcing the metadata pointer.

Write Pattern
Copy-on-Write vs. Merge-on-Read

Copy-on-Write (COW): rewrites data files on each update — expensive writes, fast reads. Use for OLAP where reads dominate. Merge-on-Read (MOR): writes delete files + new data files — cheap writes, reads must merge. Use for CDC / high-frequency updates. Can mix per-table and change later.

Compaction
Who Runs Maintenance?

Small files accumulate fast with streaming writes or MOR. Without compaction, reads degrade. You must schedule: rewriteDataFiles (bin-packing), expireSnapshots, and removeOrphanFiles. Options: Spark job (most flexible), Hive scheduled query, or managed services (AWS EMR Serverless, Tabular). This is non-negotiable operational work.

The hidden complexity: Iceberg's format is elegant; the operations surrounding it are not. Catalog management, compaction scheduling, snapshot expiry, and cross-engine compatibility all require sustained engineering attention. Iceberg is not a "set it and forget it" solution.
// 01 — Format Internals

The Three-Layer Architecture

Iceberg is a specification of files on object storage. Every "table" is a set of JSON/Avro metadata files plus the actual data files. Understanding this layering explains every behavioral characteristic of the format.

Catalog
Stores a single pointer: table_name → metadata.json path. Manages atomic swaps of that pointer.
Hive Metastore AWS Glue Nessie REST Catalog JDBC Catalog DynamoDB Catalog
Metadata
Immutable JSON & Avro files. Snapshot list, schema versions, partition specs, manifests. Small — lives on object storage alongside data.
metadata.json snapshot-N.avro manifest-list.avro manifest-file.avro
Data
Actual data files and delete files. Named by UUID — never renamed or mutated. Immutable once written.
*.parquet *.orc *.avro positional-delete.avro equality-delete.avro

Snapshot Mechanics — What Happens on Every Write

Writer
writes data file(s)
New manifest
lists data files
New manifest list
references manifests
New metadata.json
adds snapshot
Catalog atomic swap
old ptr → new ptr
Readers see
new snapshot

Old data files and old metadata are never deleted immediately — they remain valid until expireSnapshots runs. This is the source of time travel and also the source of storage bloat if expiry is not scheduled.

pythonarch/inspect_metadata.py — understand your table internals
"""
Inspect a live Iceberg table's metadata without modifying it.
Uses PyIceberg — the Python library for the Iceberg spec.
pip install pyiceberg[s3,glue]
"""
from pyiceberg.catalog.glue import GlueCatalog
from pyiceberg.catalog import load_catalog
import json, datetime

# ── Connect to catalog ──────────────────────────────────────
catalog = GlueCatalog(
    name="prod",
    **{
        "type": "glue",
        "s3.region": "us-east-1",
        "s3.access-key-id": "...",     # prefer instance role / env
        "s3.secret-access-key": "...",
    }
)

table = catalog.load_table("analytics.events")

# ── 1. Schema and Partition Spec ────────────────────────────
print("=== SCHEMA ===")
for field in table.schema().fields:
    print(f"  [{field.field_id:>3}] {field.name:<30} {str(field.field_type):<20} required={field.required}")

print("\n=== PARTITION SPEC ===")
for pf in table.spec().fields:
    print(f"  {pf.name} = {pf.transform}({pf.source_id})")

# ── 2. Snapshot History ─────────────────────────────────────
print("\n=== LAST 10 SNAPSHOTS ===")
snapshots = table.metadata.snapshots[-10:]
for snap in snapshots:
    ts = datetime.datetime.fromtimestamp(snap.timestamp_ms / 1000, tz=datetime.timezone.utc)
    print(
        f"  snapshot_id={snap.snapshot_id}  "
        f"ts={ts.isoformat()}  "
        f"op={snap.summary.get('operation','?'):<12}  "
        f"added_files={snap.summary.get('added-data-files','0'):<6}  "
        f"deleted_files={snap.summary.get('deleted-data-files','0')}"
    )

# ── 3. Current manifest statistics ─────────────────────────
print("\n=== MANIFEST STATS (current snapshot) ===")
current = table.current_snapshot()
if current:
    summary = current.summary
    total_recs  = int(summary.get('total-records', 0))
    total_files = int(summary.get('total-data-files', 0))
    total_size  = int(summary.get('total-files-size', 0))
    del_files   = int(summary.get('total-delete-files', 0))
    print(f"  Records:      {total_recs:>15,}")
    print(f"  Data files:   {total_files:>15,}")
    print(f"  Delete files: {del_files:>15,}")
    print(f"  Total size:   {total_size / 1_073_741_824:>14.2f} GiB")
    avg_size = total_size / total_files if total_files > 0 else 0
    print(f"  Avg file sz:  {avg_size / 1_048_576:>14.1f} MiB  {'⚠ small files!' if avg_size < 64*1024*1024 else '✓ healthy'}")

# ── 4. Time travel scan ─────────────────────────────────────
print("\n=== TIME TRAVEL READ (as of 24h ago) ===")
import pyarrow as pa
yesterday_ms = int((datetime.datetime.now() - datetime.timedelta(days=1)).timestamp() * 1000)
historical = table.scan(snapshot_id=None, as_of_timestamp=yesterday_ms).to_arrow()
print(f"  Row count 24h ago: {len(historical):,}")
print(f"  Schema: {historical.schema}")

Iceberg V1 vs V2 vs V3 — What Changed

FeatureV1V2V3
Row-level deletesNoYes (positional + equality)Yes + deletion vectors
Row lineageNosequence numbersYes
Nanosecond timestampsNoNoYes (timestamptz_ns)
Variant type (semi-structured)NoNoYes
Geospatial typesNoNogeometry / geography
Default sort order in metadataNoYesYes
Multiple partition specsNoYesYes
V3 adoption: As of 2024–2025, V3 is spec-complete but engine support is uneven. Spark 3.5+ and Trino 430+ have partial V3 support. Verify your engine supports the V3 features you need before specifying 'format-version' = '3' at table creation — a V2 engine will refuse to read a V3 table.
// 02 — Metadata Coordination

Catalog Deep Dive

The catalog is the single most critical architectural decision in an Iceberg deployment. It enforces the atomic metadata pointer swap that makes ACID writes possible. A misconfigured catalog leads to silent data corruption — two writers can both commit and one overwrites the other.

AWS Glue Data Catalog Managed
Concurrency controlOptimistic locking via Glue API
Engine supportSpark, Trino, Flink, Athena, EMR
Branching/taggingNo
Multi-cloudAWS only
Best forAWS-native stacks, serverless Athena queries
Gotcha10MB metadata size limit per table object
Nessie Git-for-Data
Concurrency controlOptimistic + MVCC commits
Engine supportSpark, Flink, Trino (via REST adapter)
Branching/taggingYes — full Git-like branching of tables
Multi-cloudYes — vendor neutral
Best forMulti-engine, experimentation, data-as-code
GotchaSelf-hosted; operational burden
Hive Metastore (HMS) Legacy
Concurrency controlDB-level locking (fragile)
Engine supportSpark, Trino, Flink, Hive
Branching/taggingNo
Multi-cloudYes
Best forMigration from existing Hive deployments
GotchaConcurrent writers can corrupt — must serialize
REST Catalog (Polaris / Unity OSS) Standard
Concurrency controlOptimistic, server-side enforcement
Engine supportAny engine with REST client (all major)
Branching/taggingDepends on implementation
Multi-cloudYes — the future standard
Best forGreenfield multi-engine, vendor neutral
GotchaPolaris OSS is young; Unity is Databricks-tied

Catalog Configuration — PyIceberg & Spark

pythoncatalog/pyiceberg_catalog.py
"""
PyIceberg catalog initialization patterns for all major catalog types.
pip install "pyiceberg[s3,glue,nessie,sql,pyarrow]"
"""
from pyiceberg.catalog import load_catalog

# ── 1. AWS Glue ─────────────────────────────────────────────
glue_catalog = load_catalog(
    "prod_glue",
    **{
        "type": "glue",
        "s3.region": "us-east-1",
        # Prefer IAM role — omit keys and rely on boto3 credential chain
        "s3.endpoint": "https://s3.us-east-1.amazonaws.com",
        "glue.region": "us-east-1",
    }
)

# ── 2. Nessie REST ──────────────────────────────────────────
nessie_catalog = load_catalog(
    "nessie",
    **{
        "type": "nessie",
        "uri": "http://nessie.data.internal:19120/api/v2",
        "ref": "main",                  # default branch
        "authentication.type": "BEARER",
        "authentication.token": "...",  # use Secrets Manager
        "s3.region": "us-east-1",
        "warehouse": "s3://my-lake/warehouse",
    }
)

# ── 3. REST Catalog (generic — Polaris / Unity OSS) ─────────
rest_catalog = load_catalog(
    "polaris",
    **{
        "type": "rest",
        "uri": "https://polaris.internal/api/catalog",
        "credential": "client_id:client_secret",
        "warehouse": "prod_warehouse",
        "scope": "PRINCIPAL_ROLE:analytics_rw",
    }
)

# ── 4. Local JDBC (dev / CI) ────────────────────────────────
local_catalog = load_catalog(
    "local",
    **{
        "type": "sql",
        "uri": "sqlite:////tmp/iceberg_dev.db",
        "warehouse": "/tmp/iceberg_warehouse",
    }
)

# ── Namespace & table creation (catalog-agnostic) ────────────
import pyarrow as pa
from pyiceberg.schema import Schema
from pyiceberg.types import (
    NestedField, StringType, LongType, TimestamptzType,
    DoubleType, BooleanType, IntegerType
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform, BucketTransform

catalog = glue_catalog  # or whichever you chose

# Create namespace if it doesn't exist
if ("analytics",) not in catalog.list_namespaces():
    catalog.create_namespace("analytics", properties={
        "owner": "data-engineering",
        "location": "s3://my-lake/analytics",
    })

# Define schema with explicit field IDs (important for evolution)
schema = Schema(
    NestedField(1,  "event_id",    StringType(),       required=True),
    NestedField(2,  "user_id",     LongType(),         required=True),
    NestedField(3,  "event_type",  StringType(),       required=True),
    NestedField(4,  "occurred_at", TimestamptzType(),  required=True),
    NestedField(5,  "revenue",     DoubleType(),        required=False),
    NestedField(6,  "country",     StringType(),        required=False),
    NestedField(7,  "is_bot",      BooleanType(),       required=False),
)

partition_spec = PartitionSpec(
    PartitionField(source_id=4, field_id=1000, transform=DayTransform(),    name="day"),
    PartitionField(source_id=2, field_id=1001, transform=BucketTransform(16), name="user_bucket"),
)

table = catalog.create_table(
    identifier="analytics.events",
    schema=schema,
    partition_spec=partition_spec,
    location="s3://my-lake/analytics/events",
    properties={
        "format-version": "2",
        "write.format.default": "parquet",
        "write.parquet.compression-codec": "zstd",
        "write.target-file-size-bytes": str(134_217_728),  # 128 MiB
        "write.delete.format.default": "parquet",
        "write.merge.mode": "merge-on-read",   # or copy-on-write
        "commit.retry.num-retries": "4",
        "commit.retry.min-wait-ms": "100",
        "commit.retry.max-wait-ms": "60000",
        "history.expire.max-snapshot-age-ms": str(7 * 24 * 3600 * 1000),
    }
)
// 03 — ACID, COW, MOR & Deletes

Writes, DML & Concurrency

Copy-on-Write vs. Merge-on-Read — The Full Breakdown

Copy-on-Write (COW)
Expensive Writes, Fast Reads

On UPDATE or DELETE, Iceberg reads the affected data files, rewrites them with the changes applied, and produces a new snapshot pointing at the new files. Old files are orphaned (retained until expiry).


When to use: OLAP tables where reads vastly outnumber writes. Reporting tables, dimension tables, star schema fact tables with infrequent corrections. Any table where read latency is the primary SLA.


Cost model: Each update amplifies write I/O. Updating 1 row in a 1 GiB file rewrites the full 1 GiB. For tables with high-frequency point updates, COW is financially ruinous at scale.

Merge-on-Read (MOR)
Cheap Writes, Reads Do More Work

On UPDATE or DELETE, Iceberg writes a delete file that records which rows to exclude (positional delete = row-file offset pairs; equality delete = key value matches). Data files are NOT rewritten.


When to use: CDC pipelines, GDPR delete workflows, frequent small updates to large tables. Streaming ingestion with corrections. Any pattern where writes are frequent and large file rewrites are unacceptable.


Cost model: Delete files accumulate. Reads must merge data files with all open delete files. Without compaction, read latency degrades. The delete-file-count metric is your MOR health indicator.

pythonwrites/spark_dml.py — COW and MOR patterns in Spark
"""
Spark DML with Iceberg — full patterns for INSERT, UPDATE, DELETE, MERGE.
Configure SparkSession with Iceberg extensions before running.
"""
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark = (
    SparkSession.builder
    .config("spark.sql.extensions",
             "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.prod",
             "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.prod.type", "glue")
    .config("spark.sql.catalog.prod.warehouse", "s3://my-lake/warehouse")
    .config("spark.sql.defaultCatalog", "prod")
    .getOrCreate()
)

# ── 1. Append (fastest, always correct) ─────────────────────
new_events_df.writeTo("prod.analytics.events").append()

# ── 2. Overwrite by partition (idempotent batch) ─────────────
# Replaces only partitions present in the new data — safe for reruns
(
    new_events_df
    .writeTo("prod.analytics.events")
    .option("partitionOverwriteMode", "dynamic")
    .overwritePartitions()
)

# ── 3. MERGE INTO — the Swiss Army knife ────────────────────
# Register source as temp view
cdc_df.createOrReplaceTempView("cdc_source")

spark.sql("""
    MERGE INTO prod.analytics.events AS target
    USING (
        SELECT
            event_id, user_id, event_type, occurred_at,
            revenue, country, is_bot,
            _op_type   -- 'I' insert, 'U' update, 'D' delete
        FROM cdc_source
        WHERE _op_type IN ('I', 'U', 'D')
    ) AS source
    ON target.event_id = source.event_id
    WHEN MATCHED AND source._op_type = 'D' THEN DELETE
    WHEN MATCHED AND source._op_type = 'U' THEN UPDATE SET
        target.revenue    = source.revenue,
        target.country    = source.country,
        target.is_bot     = source.is_bot
    WHEN NOT MATCHED AND source._op_type != 'D' THEN INSERT *
""")

# ── 4. DELETE with filter ────────────────────────────────────
# GDPR right-to-erasure — MOR writes a delete file, fast operation
spark.sql("""
    DELETE FROM prod.analytics.events
    WHERE user_id IN (SELECT user_id FROM gdpr_deletion_requests WHERE processed_at IS NULL)
""")

# ── 5. UPDATE ────────────────────────────────────────────────
spark.sql("""
    UPDATE prod.analytics.events
    SET is_bot = TRUE
    WHERE user_id IN (SELECT user_id FROM bot_detection_list)
      AND occurred_at > CURRENT_TIMESTAMP - INTERVAL 30 DAYS
""")

# ── 6. INSERT OVERWRITE — replace a specific partition ───────
spark.sql("""
    INSERT OVERWRITE prod.analytics.events
    PARTITION (day = '2024-01-15')
    SELECT * FROM staging.events_corrected
    WHERE date(occurred_at) = '2024-01-15'
""")

# ── 7. Time travel reads ─────────────────────────────────────
# By snapshot ID
df_snap = spark.read.option("snapshot-id", "5782341283").table("prod.analytics.events")
# By timestamp
df_ts   = spark.read.option("as-of-timestamp", "1705276800000").table("prod.analytics.events")
# Incremental read — changes between two snapshots
df_incr = (
    spark.read
    .format("iceberg")
    .option("start-snapshot-id", "5780000000")
    .option("end-snapshot-id",   "5782341283")
    .load("prod.analytics.events")
)

# ── 8. Schema evolution (no file rewrites!) ──────────────────
spark.sql("ALTER TABLE prod.analytics.events ADD COLUMN session_id BIGINT")
spark.sql("ALTER TABLE prod.analytics.events RENAME COLUMN country TO country_code")
# Existing files return NULL for new columns — zero cost

# ── 9. Partition evolution (V2+) ─────────────────────────────
# Change partition without rewriting old data
spark.sql("""
    ALTER TABLE prod.analytics.events
    ADD PARTITION FIELD hour(occurred_at)  -- add hourly on top of daily
""")
# New data is partitioned by day+hour; old data stays day-only
# Query engine handles both specs transparently

Concurrency — Commit Conflicts & Retry Logic

pythonwrites/concurrent_writes.py
"""
Iceberg uses optimistic concurrency control (OCC):
  1. Writer reads current metadata
  2. Performs computation
  3. Attempts commit (catalog CAS: old_metadata → new_metadata)
  4. If another writer committed between steps 1 and 3 → CommitFailedException
  5. Retry with exponential backoff

Safe concurrent patterns:
  - Multiple appenders: always safe (different files)
  - Appender + overwriter: conflict if same partition
  - Two overwriters of same partition: one loses, must retry
"""
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import CommitFailedException
import pyarrow as pa
import time, random, logging

log = logging.getLogger(__name__)

def write_with_retry(
    catalog_name: str,
    table_id: str,
    data: pa.Table,
    overwrite: bool = False,
    max_retries: int = 5,
    base_delay: float = 0.5,
) -> None:
    """
    Write PyArrow table to Iceberg with OCC retry loop.
    For Spark, set spark.sql.iceberg.handle-timestamp-without-timezone=true
    and use native Spark retry config instead.
    """
    catalog = load_catalog(catalog_name)
    table   = catalog.load_table(table_id)

    for attempt in range(max_retries + 1):
        try:
            if overwrite:
                table.overwrite(data)
            else:
                table.append(data)
            log.info("Committed to %s on attempt %d", table_id, attempt + 1)
            return
        except CommitFailedException as exc:
            if attempt == max_retries:
                raise RuntimeError(f"Commit failed after {max_retries} retries") from exc
            delay = base_delay * (2 ** attempt) + random.uniform(0, 0.5)
            log.warning("Commit conflict on attempt %d. Retrying in %.2fs", attempt + 1, delay)
            # Reload table to get latest metadata before reattempting
            table = catalog.load_table(table_id)
            time.sleep(delay)
// 04 — Query Planning & File Pruning

Read Performance & Optimization

Iceberg read performance comes from aggressive file elimination at the metadata level — before a single data file is opened. Understanding this planning pipeline explains every tuning lever.

1. Catalog: fetch metadata.json → get current snapshot → manifest list path
2. Filter manifest list by partition summary stats → prune irrelevant manifests
3. Read surviving manifests → filter by column-level min/max stats → prune data files
4. Read surviving data files → apply row-group filter (Parquet) → return results
⚠ If MOR: also join delete files → apply positional/equality deletes → return corrected rows
pythonreads/performance_patterns.py
"""
Read performance patterns — PyIceberg + PyArrow + DuckDB
"""
import pyarrow as pa
import pyarrow.compute as pc
import duckdb
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import (
    EqualTo, GreaterThanOrEqual, LessThan, And, Or, Not, IsNull, NotNull
)

catalog = load_catalog("prod")
table = catalog.load_table("analytics.events")

# ── 1. Scan with partition + column pruning ──────────────────
# Iceberg eliminates files at manifest level — far fewer S3 requests
scan = table.scan(
    row_filter=And(
        GreaterThanOrEqual("occurred_at", "2024-01-01T00:00:00+00:00"),
        LessThan("occurred_at", "2024-02-01T00:00:00+00:00"),
        EqualTo("country_code", "US"),
        NotNull("revenue"),
    ),
    selected_fields=("user_id", "event_type", "revenue", "occurred_at"),  # column pruning
    limit=1_000_000,
)

# ── 2. Convert to Arrow — zero-copy when possible ────────────
arrow_table = scan.to_arrow()
print(f"Rows: {len(arrow_table):,}  Cols: {len(arrow_table.schema)}")

# ── 3. Query via DuckDB — best for ad-hoc analytics ─────────
# DuckDB can directly query Iceberg via the PyArrow table
conn = duckdb.connect()
conn.register("events", arrow_table)
result = conn.execute("""
    SELECT
        event_type,
        COUNT(*) AS event_count,
        SUM(revenue) AS total_revenue,
        AVG(revenue) AS avg_revenue
    FROM events
    WHERE event_type != 'page_view'
    GROUP BY event_type
    ORDER BY total_revenue DESC
""").arrow()

# ── 4. Plan inspection — see what files Iceberg will read ────
print("\n=== SCAN PLAN ===")
for task in scan.plan_files():
    print(f"  file={task.file.file_path.split('/')[-1]}"
          f"  size={task.file.file_size_in_bytes/1_048_576:.1f}MiB"
          f"  records={task.file.record_count:,}"
          f"  delete_files={len(task.delete_files)}")

# ── 5. Spark: tune scan parallelism ─────────────────────────
# SPARK SQL — tune these for large scans
# spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
# spark.conf.set("spark.sql.iceberg.split-size", str(128 * 1024 * 1024))  # 128 MiB
# spark.conf.set("spark.sql.iceberg.split-open-file-cost", str(4 * 1024 * 1024))
# spark.conf.set("spark.sql.iceberg.split-lookback", "10")
# df = spark.table("prod.analytics.events")
#     .filter("occurred_at >= '2024-01-01'")
#     .hint("iceberg.scan.filter-pushdown", True)  # always on in modern versions

# ── 6. Streaming read from Iceberg (Spark Structured Streaming)
# df_stream = (
#     spark.readStream
#     .format("iceberg")
#     .option("stream-from-timestamp", "1704067200000")  # epoch ms
#     .option("streaming-max-files-per-trigger", 100)    # throttle
#     .load("prod.analytics.events")
# )
# df_stream.writeStream.format("console").start().awaitTermination()

Hidden Partitioning — Why It Matters

sqlhidden_partitioning.sql — Iceberg vs Hive partitioning
-- ── HIVE partitioning (brittle, user must know partition columns) ──
-- User MUST specify partition column in query, or full scan happens:
SELECT * FROM hive_events WHERE dt = '2024-01-15'  -- works
SELECT * FROM hive_events WHERE event_time > '2024-01-15'  -- FULL TABLE SCAN!

-- ── ICEBERG hidden partitioning ─────────────────────────────────
-- Table defined with PARTITIONED BY (days(occurred_at))
-- Iceberg computes partition value internally — user queries raw column:
SELECT * FROM prod.analytics.events
WHERE occurred_at >= TIMESTAMP '2024-01-15 00:00:00 UTC'  -- auto-pruned!
  AND occurred_at <  TIMESTAMP '2024-01-16 00:00:00 UTC'

-- Iceberg translates this to partition filter: day = 19737 (days since epoch)
-- Zero Hive-style partition management in user queries

-- ── Transform options ────────────────────────────────────────────
-- years(ts)      → partition by year
-- months(ts)     → partition by month
-- days(ts)       → partition by day
-- hours(ts)      → partition by hour
-- bucket(N, col) → hash bucket into N buckets (for high-cardinality joins)
-- truncate(W, col) → truncate string/int to W chars/digits

-- Multiple transforms in one spec (V2+):
-- PARTITIONED BY (days(occurred_at), bucket(16, user_id))
-- → good for time-range + user_id join patterns
// 05 — Compaction, Expiry & Cleanup

Table Maintenance — Non-Negotiable Operations

This is the #1 operational failure mode in production Iceberg deployments: teams build beautiful pipelines but neglect maintenance, then wonder why reads degrade after 3 months. Compaction and expiry must be scheduled jobs, not afterthoughts.
128 MiB
Target data file size
<10
Max delete files per data file
7 days
Default snapshot retention
0
Orphan files tolerated
daily
Compaction frequency (MOR tables)
pythonmaintenance/compaction_job.py — Spark-based Iceberg maintenance
"""
Production maintenance job — runs via Airflow / cron.
Cover ALL four maintenance procedures — skip any one and you'll regret it.
"""
from pyspark.sql import SparkSession
import logging, argparse

log = logging.getLogger(__name__)

spark = SparkSession.builder.appName("iceberg-maintenance").getOrCreate()

def compact_data_files(table_id: str, strategy: str = "binpack"):
    """
    Procedure 1: rewriteDataFiles
    Merges small files into target-size files.
    Strategies: 'binpack' (default), 'sort', 'zorder'
    """
    log.info("[COMPACT] Starting %s for %s", strategy, table_id)

    if strategy == "binpack":
        result = spark.sql(f"""
            CALL prod.system.rewrite_data_files(
                table => '{table_id}',
                strategy => 'binpack',
                options => map(
                    'target-file-size-bytes', '134217728',   -- 128 MiB
                    'min-file-size-bytes',    '33554432',    -- 32 MiB: only compact files < this
                    'max-file-size-bytes',    '214748364',   -- 200 MiB: only compact files > this
                    'min-input-files',        '5',           -- skip partition if < 5 small files
                    'rewrite-all',            'false',       -- don't rewrite healthy files
                    'partial-progress.enabled', 'true',      -- commit in batches, not all-or-nothing
                    'partial-progress.max-commits', '10',
                    'max-concurrent-file-group-rewrites', '100'
                )
            )
        """)

    elif strategy == "sort":
        # Sort by commonly filtered columns for better min/max pruning
        result = spark.sql(f"""
            CALL prod.system.rewrite_data_files(
                table => '{table_id}',
                strategy => 'sort',
                sort_order => 'zorder(occurred_at, user_id)',
                options => map(
                    'target-file-size-bytes', '134217728',
                    'max-concurrent-file-group-rewrites', '50'
                )
            )
        """)

    row = result.first()
    log.info("[COMPACT] Done: %d files → %d files, %d bytes rewritten",
             row.rewritten_files_count, row.added_files_count, row.rewritten_bytes_count)
    return row

def compact_delete_files(table_id: str):
    """
    Procedure 2: rewritePositionDeleteFiles
    Merges many small positional delete files into fewer larger ones.
    Only relevant for MOR tables. Run before or after data file compaction.
    """
    log.info("[DELETE-COMPACT] Starting for %s", table_id)
    result = spark.sql(f"""
        CALL prod.system.rewrite_position_delete_files(
            table => '{table_id}',
            options => map(
                'target-file-size-bytes', '67108864',  -- 64 MiB for delete files
                'rewrite-all', 'false'
            )
        )
    """)
    log.info("[DELETE-COMPACT] Done: %s", result.first())

def expire_snapshots(table_id: str, retain_days: int = 7):
    """
    Procedure 3: expireSnapshots
    Removes old snapshot metadata and marks unreferenced data files
    for deletion. Does NOT delete files — removeOrphanFiles does that.
    """
    log.info("[EXPIRE] Expiring snapshots older than %d days for %s", retain_days, table_id)
    result = spark.sql(f"""
        CALL prod.system.expire_snapshots(
            table => '{table_id}',
            older_than => TIMESTAMPADD(DAY, -{retain_days}, CURRENT_TIMESTAMP),
            retain_last => 5,           -- always keep at least 5 snapshots
            stream_results => true      -- don't collect all results in driver memory
        )
    """)
    log.info("[EXPIRE] Done: %s", result.first())

def remove_orphan_files(table_id: str, older_than_days: int = 3):
    """
    Procedure 4: removeOrphanFiles
    Physically deletes files on storage that are not referenced by any
    current metadata. Safe buffer: always older_than >= snapshot retention.
    WARNING: set dry_run=true first to inspect what would be deleted!
    """
    log.info("[ORPHAN] Scanning for orphan files older than %dd in %s", older_than_days, table_id)
    result = spark.sql(f"""
        CALL prod.system.remove_orphan_files(
            table => '{table_id}',
            older_than => TIMESTAMPADD(DAY, -{older_than_days}, CURRENT_TIMESTAMP),
            dry_run => false  -- set to true in staging to preview
        )
    """)
    deleted = result.count()
    log.info("[ORPHAN] Deleted %d orphan files", deleted)

def run_full_maintenance(table_id: str, is_mor: bool = False):
    """
    Full maintenance pipeline — order matters!
    1. Compact deletes (if MOR) — must run before data compaction
    2. Compact data files — merge small files
    3. Expire snapshots — release old metadata references
    4. Remove orphan files — physically delete unreferenced files
    """
    if is_mor:
        compact_delete_files(table_id)
    compact_data_files(table_id, strategy="binpack")
    expire_snapshots(table_id, retain_days=7)
    remove_orphan_files(table_id, older_than_days=3)  # must be > retain window
    log.info("[MAINTENANCE] Complete for %s", table_id)

Maintenance Health Monitoring

pythonmaintenance/health_check.py
"""
Iceberg table health metrics — run periodically and alert on thresholds.
All queries via PyIceberg metadata inspection (no Spark needed).
"""
from pyiceberg.catalog import load_catalog
from dataclasses import dataclass
from typing import Optional
import statistics

@dataclass
class TableHealthReport:
    table_id: str
    total_data_files: int
    total_delete_files: int
    avg_data_file_size_mb: float
    small_files_count: int       # files < 32 MiB
    snapshots_count: int
    oldest_snapshot_days: float
    delete_to_data_ratio: float  # red flag if > 0.5
    total_size_gb: float
    warnings: list[str]
    critical: list[str]

def inspect_table_health(catalog_name: str, table_id: str) -> TableHealthReport:
    import datetime

    catalog = load_catalog(catalog_name)
    table   = catalog.load_table(table_id)
    snap    = table.current_snapshot()
    warnings, critical = [], []

    if not snap:
        return TableHealthReport(table_id=table_id, total_data_files=0,
                                  total_delete_files=0, avg_data_file_size_mb=0,
                                  small_files_count=0, snapshots_count=0,
                                  oldest_snapshot_days=0, delete_to_data_ratio=0,
                                  total_size_gb=0, warnings=["Empty table"], critical=[])

    s = snap.summary
    total_data    = int(s.get('total-data-files', 0))
    total_deletes = int(s.get('total-delete-files', 0))
    total_size    = int(s.get('total-files-size', 0))
    avg_size      = (total_size / total_data / 1_048_576) if total_data > 0 else 0

    # Inspect individual manifest entries for file-level stats
    file_sizes, small_count = [], 0
    for manifest in table.current_snapshot().manifests(table.io):
        for entry in manifest.fetch_manifest_entry(table.io):
            sz = entry.data_file.file_size_in_bytes
            file_sizes.append(sz)
            if sz < 32 * 1_048_576:
                small_count += 1

    snap_count   = len(table.metadata.snapshots)
    oldest_ts    = min(s.timestamp_ms for s in table.metadata.snapshots)
    oldest_days  = (datetime.datetime.now().timestamp() * 1000 - oldest_ts) / (86400 * 1000)
    del_ratio    = total_deletes / total_data if total_data > 0 else 0

    # Evaluate health thresholds
    if avg_size < 32:    warnings.append(f"Avg file size {avg_size:.0f} MiB < 32 MiB — run compaction")
    if avg_size > 512:   warnings.append(f"Avg file size {avg_size:.0f} MiB > 512 MiB — may hurt parallelism")
    if del_ratio > 0.5:  critical.append(f"Delete/data file ratio {del_ratio:.2f} > 0.5 — MOR reads degraded")
    if del_ratio > 0.2:  warnings.append(f"Delete/data file ratio {del_ratio:.2f} — compact delete files")
    if snap_count > 100: warnings.append(f"{snap_count} snapshots — run expireSnapshots")
    if oldest_days > 14: critical.append(f"Oldest snapshot {oldest_days:.0f} days old — storage bloat risk")
    if small_count > total_data * 0.3:
        warnings.append(f"{small_count}/{total_data} files < 32 MiB — small file problem")

    return TableHealthReport(
        table_id=table_id, total_data_files=total_data, total_delete_files=total_deletes,
        avg_data_file_size_mb=avg_size, small_files_count=small_count,
        snapshots_count=snap_count, oldest_snapshot_days=oldest_days,
        delete_to_data_ratio=del_ratio, total_size_gb=total_size / 1_073_741_824,
        warnings=warnings, critical=critical,
    )
// 06 — Multi-Engine Configuration

Engine Setup & Compatibility

Multi-engine golden rule: every engine that writes must go through the same catalog. Direct S3 path access from a second engine bypasses the catalog's atomic swap and will cause data corruption. This is not theoretical — it happens in practice.
pythonengines/spark_config.py — PySpark full Iceberg config
from pyspark.sql import SparkSession

# Iceberg JAR — check https://iceberg.apache.org/releases/ for latest
ICEBERG_VERSION = "1.6.1"
SPARK_VERSION   = "3.5"

spark = (
    SparkSession.builder
    .appName("iceberg-production")

    # ── Extensions ─────────────────────────────────────────────
    .config("spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")

    # ── Catalog config ─────────────────────────────────────────
    .config("spark.sql.catalog.prod",
            "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.prod.catalog-impl",
            "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.prod.warehouse", "s3://my-lake/warehouse")
    .config("spark.sql.catalog.prod.io-impl",
            "org.apache.iceberg.aws.s3.S3FileIO")
    .config("spark.sql.catalog.prod.s3.region", "us-east-1")

    # ── S3 performance ─────────────────────────────────────────
    .config("spark.sql.catalog.prod.s3.multipart-enabled", "true")
    .config("spark.sql.catalog.prod.s3.multipart-size", "67108864")  # 64MiB
    .config("spark.sql.catalog.prod.s3.upload-thread-count", "10")

    # ── Write defaults ─────────────────────────────────────────
    .config("spark.sql.iceberg.check-nullability", "false")  # relax for migration
    .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
    .config("spark.sql.defaultCatalog", "prod")

    # ── Vectorized reads (Parquet) ─────────────────────────────
    .config("spark.sql.parquet.enableVectorizedReader", "true")
    .config("spark.sql.parquet.filterPushdown", "true")

    # ── Packages (submit via spark-submit --packages instead) ──
    # f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION}"
    # f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}"

    .getOrCreate()
)
sqlengines/trino_config.sql — Trino Iceberg connector
-- /etc/trino/catalog/iceberg.properties connector.name=iceberg hive.metastore=glue hive.metastore.glue.region=us-east-1 iceberg.file-format=PARQUET iceberg.compression-codec=ZSTD iceberg.target-max-file-size=134217728 iceberg.unique-table-location=true iceberg.dynamic-filtering.wait-timeout=30s -- Native splits for better parallelism (Trino 430+): iceberg.split-manager-threads=32 iceberg.max-partitions-per-scan=200 -- For Nessie REST catalog instead of Glue: -- connector.name=iceberg -- iceberg.catalog.type=rest -- iceberg.rest-catalog.uri=http://nessie.data.internal:19120/iceberg -- iceberg.rest-catalog.warehouse=prod_warehouse -- Query examples in Trino: SHOW SCHEMAS FROM iceberg; SHOW TABLES FROM iceberg.analytics; -- Time travel in Trino: SELECT * FROM iceberg.analytics.events FOR VERSION AS OF 5782341283; -- by snapshot ID SELECT * FROM iceberg.analytics.events FOR TIMESTAMP AS OF TIMESTAMP '2024-01-15 12:00:00 UTC'; -- Inspect table snapshots: SELECT snapshot_id, committed_at, operation, summary FROM iceberg.analytics."events$snapshots" ORDER BY committed_at DESC LIMIT 20; -- Inspect files for the current snapshot: SELECT file_path, file_size_in_bytes, record_count, column_sizes FROM iceberg.analytics."events$files" ORDER BY file_size_in_bytes DESC LIMIT 50;
pythonengines/flink_iceberg.py — Flink streaming sink
"""
Flink → Iceberg streaming sink. Flink is the preferred engine for
low-latency CDC ingestion into Iceberg tables. Writes are
transactionally committed at checkpoint intervals.
"""
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.enable_checkpointing(60_000)  # 60s checkpoints = 60s commit latency
env.get_checkpoint_config().set_min_pause_between_checkpoints(30_000)
env.get_checkpoint_config().set_checkpoint_timeout(120_000)

t_env = TableEnvironment.create(EnvironmentSettings.new_instance().build())

# Configure Iceberg catalog in Flink SQL
t_env.execute_sql("""
    CREATE CATALOG iceberg_catalog WITH (
        'type'              = 'iceberg',
        'catalog-type'      = 'glue',
        'warehouse'         = 's3://my-lake/warehouse',
        'io-impl'           = 'org.apache.iceberg.aws.s3.S3FileIO',
        'glue.region'       = 'us-east-1'
    )
""")

t_env.execute_sql("USE CATALOG iceberg_catalog")
t_env.execute_sql("USE analytics")

# Kafka source → Iceberg sink via SQL (simplest pattern)
t_env.execute_sql("""
    CREATE TEMPORARY TABLE kafka_events (
        event_id     STRING,
        user_id      BIGINT,
        event_type   STRING,
        occurred_at  TIMESTAMP(6) WITH LOCAL TIME ZONE,
        revenue      DOUBLE,
        country_code STRING,
        proc_time    AS PROCTIME()
    ) WITH (
        'connector'         = 'kafka',
        'topic'             = 'prod.events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'scan.startup.mode' = 'latest-offset',
        'format'            = 'json',
        'json.timestamp-format.standard' = 'ISO-8601'
    )
""")

# Insert into Iceberg — commits at each Flink checkpoint
t_env.execute_sql("""
    INSERT INTO iceberg_catalog.analytics.events
    SELECT
        event_id, user_id, event_type,
        occurred_at, revenue, country_code,
        FALSE AS is_bot   -- default; updated via batch job
    FROM kafka_events
""")
// 07 — Risks, Trade-offs & Gotchas

Operational Reality & Known Risks

Migration from Hive to Iceberg

pythonops/hive_migration.py — in-place migration without data copy
"""
Migrate existing Hive table to Iceberg format IN-PLACE — no data copy.
This registers the existing Parquet files under Iceberg metadata.
Critical: the source table must be external (not managed) Hive table.
"""
from pyspark.sql import SparkSession

spark = SparkSession.builder  # ... (full config from engines tab).getOrCreate()

# ── Option A: SHADOW MIGRATION (safest — new Iceberg table, copy data)
# No downtime, but requires double storage during transition
spark.sql("""
    CREATE TABLE prod.analytics.events_iceberg
    USING iceberg
    AS SELECT * FROM hive.analytics.events
""")
# Validate, then atomic rename via catalog API

# ── Option B: IN-PLACE via MIGRATE PROCEDURE (Spark only, no data copy)
# Registers existing Parquet files under Iceberg metadata
# WARNING: Reads continue to work during migration, writes MUST stop
spark.sql("""
    CALL prod.system.migrate(
        table => 'hive.analytics.events',
        properties => map(
            'format-version', '2',
            'write.format.default', 'parquet',
            'write.parquet.compression-codec', 'zstd'
        )
    )
""")

# ── Post-migration validation ────────────────────────────────
hive_count    = spark.sql("SELECT COUNT(*) FROM hive.analytics.events").first()[0]
iceberg_count = spark.sql("SELECT COUNT(*) FROM prod.analytics.events").first()[0]
assert hive_count == iceberg_count, (
    f"Row count mismatch! Hive={hive_count}, Iceberg={iceberg_count}"
)
print(f"Migration validated: {iceberg_count:,} rows confirmed")

# ── Schema snapshot before/after for diff ───────────────────
from pyiceberg.catalog import load_catalog
catalog = load_catalog("prod")
table   = catalog.load_table("analytics.events")
print("Format version:", table.metadata.format_version)
print("Partition spec:", table.spec())

Known Gotchas & Anti-Patterns

Anti-Pattern
Too Many Small Files

Streaming ingestion writes one file per checkpoint per partition. With 100 partitions and 60s checkpoints, you generate 100 files/minute. Without compaction: 144,000 files/day. S3 LIST operations become the bottleneck, not read I/O. Fix: compact daily minimum, hourly for high-volume tables.

Anti-Pattern
High-Cardinality Partition Column

Partitioning by user_id (millions of unique values) creates millions of partition directories. Metadata operations (manifest scans, ListObjects) become catastrophically slow. Fix: use bucket(N, user_id) transform. N = 16–256 depending on scale.

Gotcha
HMS Concurrent Writers

Hive Metastore uses database-level locking for Iceberg commits. Two concurrent Spark jobs writing the same table may deadlock or corrupt metadata. Fix: migrate to Glue, Nessie, or REST catalog. These use OCC (optimistic locking) and handle concurrency correctly.

Gotcha
expireSnapshots Before removeOrphanFiles

You must run expireSnapshots with older_than buffer ≥ the gap between your last writer's completion and when removeOrphanFiles runs. If a writer is still writing files that haven't been committed to a snapshot yet, removeOrphanFiles may delete them. The buffer should be at least 24 hours in practice.

Gotcha
V2 Table Opened by V1 Engine

A V1 engine (old Spark Iceberg connector) cannot read a V2 table with equality deletes — it will either error or silently return wrong results (rows that should be deleted are returned). Pin connector versions per table spec version and enforce via CI checks on connector upgrades.

Anti-Pattern
No Compaction on MOR Tables

After 30 days of CDC without compaction, a MOR table may have 10 delete files per data file. Every read must merge 11 files instead of 1. Read latency increases 5–20×. Storage doubles (delete files are not small). Without an SLA on delete file ratio, production SLAs will be missed silently.

Production Runbook Checklist

Recommended stack for greenfield on AWS: Glue Data Catalog + S3 + Spark (EMR) for write/compaction + Trino (Athena v3 or self-hosted) for interactive query + PyIceberg for metadata inspection. Add Flink if you need streaming ingestion. This combination has the most mature production deployments and widest community support.