Should You Adopt Iceberg?
Iceberg is a table format specification, not an engine or a database. It defines the layout of files on object storage and how multiple compute engines can safely read and write them. Adopting it is an infrastructure commitment with genuine operational weight — answer these questions honestly first.
Pre-Adoption Decision Framework
Adopt Iceberg if...
- Multiple engines need the same tables (Spark + Trino + Flink)
- You need ACID writes with snapshot isolation
- You need time travel / reproducible queries
- Schema evolution without ETL rewrites is critical
- Tables exceed 1 TB and file pruning matters
- CDC / row-level updates / deletes are required
- Partition evolution without breaking existing queries
- Regulatory need to audit or roll back table state
Maybe don't if...
- Single-engine workload (Spark only → Delta might be simpler)
- Purely streaming, no batch reads
- Tables < 100 GB — plain Parquet is faster to operate
- No dedicated data engineers to run compaction
- Your cloud platform is already Delta-native (Databricks)
- Existing Hive tables work and you have no pain
- Append-only logs with no time travel need
Clarify before starting
- Row-level updates / deletes — or append-only analytics?
- Which engines need write access vs. read-only?
- Do you need nanosecond timestamps or geospatial types? (V3)
- Catalog: managed (Glue) or self-hosted (Nessie)?
- Existing Hive tables to migrate or greenfield?
- Who owns compaction scheduling?
- Storage backend: S3 / GCS / ADLS?
- SLA on stale metadata / snapshot expiry?
Storage Backend Comparison
| Backend | Strong Consistency? | Atomic Rename? | Cost Profile | Notes |
|---|---|---|---|---|
| AWS S3 | Yes (since 2020) | No — use DynamoDB lock | $0.023/GB + req | Most mature. Use S3 DynamoDB locking for catalog safety with Hive Metastore. |
| GCS | Yes | Yes (multi-obj) | $0.020/GB + ops | Best atomic semantics outside cloud. BigLake catalog integrates natively. |
| Azure ADLS Gen2 | Yes | Yes (HNS) | ~$0.018/GB + tx | Hierarchical namespace required for atomic directory ops. Enable HNS from the start. |
| MinIO (on-prem) | Yes | Partial | Infra cost | S3-compatible. Works well; needs proper replication setup for durability. |
Key Decisions at a Glance
Default: Parquet. Best columnar read performance for analytics. ORC if primarily Hive-native and you need transactional efficiency there. Avro for streaming write-heavy workloads (row-oriented, fast append) — rarely used for analytical queries. Iceberg V3 adds Variant type for semi-structured JSON embedded in Parquet.
Greenfield on AWS: Glue Data Catalog. Multi-cloud or Git-like branching: Nessie. Vendor-neutral / multi-engine: REST catalog (Polaris, Unity Catalog OSS). Existing Hadoop: Hive Metastore. Catalog choice determines multi-engine safety — never share a table across engines without a catalog enforcing the metadata pointer.
Copy-on-Write (COW): rewrites data files on each update — expensive writes, fast reads. Use for OLAP where reads dominate. Merge-on-Read (MOR): writes delete files + new data files — cheap writes, reads must merge. Use for CDC / high-frequency updates. Can mix per-table and change later.
Small files accumulate fast with streaming writes or MOR. Without compaction, reads degrade. You must schedule: rewriteDataFiles (bin-packing), expireSnapshots, and removeOrphanFiles. Options: Spark job (most flexible), Hive scheduled query, or managed services (AWS EMR Serverless, Tabular). This is non-negotiable operational work.
The Three-Layer Architecture
Iceberg is a specification of files on object storage. Every "table" is a set of JSON/Avro metadata files plus the actual data files. Understanding this layering explains every behavioral characteristic of the format.
table_name → metadata.json path. Manages atomic swaps of that pointer.Snapshot Mechanics — What Happens on Every Write
writes data file(s)
lists data files
references manifests
adds snapshot
old ptr → new ptr
new snapshot
Old data files and old metadata are never deleted immediately — they remain valid until expireSnapshots runs. This is the source of time travel and also the source of storage bloat if expiry is not scheduled.
""" Inspect a live Iceberg table's metadata without modifying it. Uses PyIceberg — the Python library for the Iceberg spec. pip install pyiceberg[s3,glue] """ from pyiceberg.catalog.glue import GlueCatalog from pyiceberg.catalog import load_catalog import json, datetime # ── Connect to catalog ────────────────────────────────────── catalog = GlueCatalog( name="prod", **{ "type": "glue", "s3.region": "us-east-1", "s3.access-key-id": "...", # prefer instance role / env "s3.secret-access-key": "...", } ) table = catalog.load_table("analytics.events") # ── 1. Schema and Partition Spec ──────────────────────────── print("=== SCHEMA ===") for field in table.schema().fields: print(f" [{field.field_id:>3}] {field.name:<30} {str(field.field_type):<20} required={field.required}") print("\n=== PARTITION SPEC ===") for pf in table.spec().fields: print(f" {pf.name} = {pf.transform}({pf.source_id})") # ── 2. Snapshot History ───────────────────────────────────── print("\n=== LAST 10 SNAPSHOTS ===") snapshots = table.metadata.snapshots[-10:] for snap in snapshots: ts = datetime.datetime.fromtimestamp(snap.timestamp_ms / 1000, tz=datetime.timezone.utc) print( f" snapshot_id={snap.snapshot_id} " f"ts={ts.isoformat()} " f"op={snap.summary.get('operation','?'):<12} " f"added_files={snap.summary.get('added-data-files','0'):<6} " f"deleted_files={snap.summary.get('deleted-data-files','0')}" ) # ── 3. Current manifest statistics ───────────────────────── print("\n=== MANIFEST STATS (current snapshot) ===") current = table.current_snapshot() if current: summary = current.summary total_recs = int(summary.get('total-records', 0)) total_files = int(summary.get('total-data-files', 0)) total_size = int(summary.get('total-files-size', 0)) del_files = int(summary.get('total-delete-files', 0)) print(f" Records: {total_recs:>15,}") print(f" Data files: {total_files:>15,}") print(f" Delete files: {del_files:>15,}") print(f" Total size: {total_size / 1_073_741_824:>14.2f} GiB") avg_size = total_size / total_files if total_files > 0 else 0 print(f" Avg file sz: {avg_size / 1_048_576:>14.1f} MiB {'⚠ small files!' if avg_size < 64*1024*1024 else '✓ healthy'}") # ── 4. Time travel scan ───────────────────────────────────── print("\n=== TIME TRAVEL READ (as of 24h ago) ===") import pyarrow as pa yesterday_ms = int((datetime.datetime.now() - datetime.timedelta(days=1)).timestamp() * 1000) historical = table.scan(snapshot_id=None, as_of_timestamp=yesterday_ms).to_arrow() print(f" Row count 24h ago: {len(historical):,}") print(f" Schema: {historical.schema}")
Iceberg V1 vs V2 vs V3 — What Changed
| Feature | V1 | V2 | V3 |
|---|---|---|---|
| Row-level deletes | No | Yes (positional + equality) | Yes + deletion vectors |
| Row lineage | No | sequence numbers | Yes |
| Nanosecond timestamps | No | No | Yes (timestamptz_ns) |
| Variant type (semi-structured) | No | No | Yes |
| Geospatial types | No | No | geometry / geography |
| Default sort order in metadata | No | Yes | Yes |
| Multiple partition specs | No | Yes | Yes |
'format-version' = '3' at table creation — a V2 engine will refuse to read a V3 table.
Catalog Deep Dive
The catalog is the single most critical architectural decision in an Iceberg deployment. It enforces the atomic metadata pointer swap that makes ACID writes possible. A misconfigured catalog leads to silent data corruption — two writers can both commit and one overwrites the other.
Catalog Configuration — PyIceberg & Spark
""" PyIceberg catalog initialization patterns for all major catalog types. pip install "pyiceberg[s3,glue,nessie,sql,pyarrow]" """ from pyiceberg.catalog import load_catalog # ── 1. AWS Glue ───────────────────────────────────────────── glue_catalog = load_catalog( "prod_glue", **{ "type": "glue", "s3.region": "us-east-1", # Prefer IAM role — omit keys and rely on boto3 credential chain "s3.endpoint": "https://s3.us-east-1.amazonaws.com", "glue.region": "us-east-1", } ) # ── 2. Nessie REST ────────────────────────────────────────── nessie_catalog = load_catalog( "nessie", **{ "type": "nessie", "uri": "http://nessie.data.internal:19120/api/v2", "ref": "main", # default branch "authentication.type": "BEARER", "authentication.token": "...", # use Secrets Manager "s3.region": "us-east-1", "warehouse": "s3://my-lake/warehouse", } ) # ── 3. REST Catalog (generic — Polaris / Unity OSS) ───────── rest_catalog = load_catalog( "polaris", **{ "type": "rest", "uri": "https://polaris.internal/api/catalog", "credential": "client_id:client_secret", "warehouse": "prod_warehouse", "scope": "PRINCIPAL_ROLE:analytics_rw", } ) # ── 4. Local JDBC (dev / CI) ──────────────────────────────── local_catalog = load_catalog( "local", **{ "type": "sql", "uri": "sqlite:////tmp/iceberg_dev.db", "warehouse": "/tmp/iceberg_warehouse", } ) # ── Namespace & table creation (catalog-agnostic) ──────────── import pyarrow as pa from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, LongType, TimestamptzType, DoubleType, BooleanType, IntegerType ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform, BucketTransform catalog = glue_catalog # or whichever you chose # Create namespace if it doesn't exist if ("analytics",) not in catalog.list_namespaces(): catalog.create_namespace("analytics", properties={ "owner": "data-engineering", "location": "s3://my-lake/analytics", }) # Define schema with explicit field IDs (important for evolution) schema = Schema( NestedField(1, "event_id", StringType(), required=True), NestedField(2, "user_id", LongType(), required=True), NestedField(3, "event_type", StringType(), required=True), NestedField(4, "occurred_at", TimestamptzType(), required=True), NestedField(5, "revenue", DoubleType(), required=False), NestedField(6, "country", StringType(), required=False), NestedField(7, "is_bot", BooleanType(), required=False), ) partition_spec = PartitionSpec( PartitionField(source_id=4, field_id=1000, transform=DayTransform(), name="day"), PartitionField(source_id=2, field_id=1001, transform=BucketTransform(16), name="user_bucket"), ) table = catalog.create_table( identifier="analytics.events", schema=schema, partition_spec=partition_spec, location="s3://my-lake/analytics/events", properties={ "format-version": "2", "write.format.default": "parquet", "write.parquet.compression-codec": "zstd", "write.target-file-size-bytes": str(134_217_728), # 128 MiB "write.delete.format.default": "parquet", "write.merge.mode": "merge-on-read", # or copy-on-write "commit.retry.num-retries": "4", "commit.retry.min-wait-ms": "100", "commit.retry.max-wait-ms": "60000", "history.expire.max-snapshot-age-ms": str(7 * 24 * 3600 * 1000), } )
Writes, DML & Concurrency
Copy-on-Write vs. Merge-on-Read — The Full Breakdown
On UPDATE or DELETE, Iceberg reads the affected data files, rewrites them with the changes applied, and produces a new snapshot pointing at the new files. Old files are orphaned (retained until expiry).
When to use: OLAP tables where reads vastly outnumber writes. Reporting tables, dimension tables, star schema fact tables with infrequent corrections. Any table where read latency is the primary SLA.
Cost model: Each update amplifies write I/O. Updating 1 row in a 1 GiB file rewrites the full 1 GiB. For tables with high-frequency point updates, COW is financially ruinous at scale.
On UPDATE or DELETE, Iceberg writes a delete file that records which rows to exclude (positional delete = row-file offset pairs; equality delete = key value matches). Data files are NOT rewritten.
When to use: CDC pipelines, GDPR delete workflows, frequent small updates to large tables. Streaming ingestion with corrections. Any pattern where writes are frequent and large file rewrites are unacceptable.
Cost model: Delete files accumulate. Reads must merge data files with all open delete files. Without compaction, read latency degrades. The delete-file-count metric is your MOR health indicator.
""" Spark DML with Iceberg — full patterns for INSERT, UPDATE, DELETE, MERGE. Configure SparkSession with Iceberg extensions before running. """ from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import * spark = ( SparkSession.builder .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.catalog.prod", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.prod.type", "glue") .config("spark.sql.catalog.prod.warehouse", "s3://my-lake/warehouse") .config("spark.sql.defaultCatalog", "prod") .getOrCreate() ) # ── 1. Append (fastest, always correct) ───────────────────── new_events_df.writeTo("prod.analytics.events").append() # ── 2. Overwrite by partition (idempotent batch) ───────────── # Replaces only partitions present in the new data — safe for reruns ( new_events_df .writeTo("prod.analytics.events") .option("partitionOverwriteMode", "dynamic") .overwritePartitions() ) # ── 3. MERGE INTO — the Swiss Army knife ──────────────────── # Register source as temp view cdc_df.createOrReplaceTempView("cdc_source") spark.sql(""" MERGE INTO prod.analytics.events AS target USING ( SELECT event_id, user_id, event_type, occurred_at, revenue, country, is_bot, _op_type -- 'I' insert, 'U' update, 'D' delete FROM cdc_source WHERE _op_type IN ('I', 'U', 'D') ) AS source ON target.event_id = source.event_id WHEN MATCHED AND source._op_type = 'D' THEN DELETE WHEN MATCHED AND source._op_type = 'U' THEN UPDATE SET target.revenue = source.revenue, target.country = source.country, target.is_bot = source.is_bot WHEN NOT MATCHED AND source._op_type != 'D' THEN INSERT * """) # ── 4. DELETE with filter ──────────────────────────────────── # GDPR right-to-erasure — MOR writes a delete file, fast operation spark.sql(""" DELETE FROM prod.analytics.events WHERE user_id IN (SELECT user_id FROM gdpr_deletion_requests WHERE processed_at IS NULL) """) # ── 5. UPDATE ──────────────────────────────────────────────── spark.sql(""" UPDATE prod.analytics.events SET is_bot = TRUE WHERE user_id IN (SELECT user_id FROM bot_detection_list) AND occurred_at > CURRENT_TIMESTAMP - INTERVAL 30 DAYS """) # ── 6. INSERT OVERWRITE — replace a specific partition ─────── spark.sql(""" INSERT OVERWRITE prod.analytics.events PARTITION (day = '2024-01-15') SELECT * FROM staging.events_corrected WHERE date(occurred_at) = '2024-01-15' """) # ── 7. Time travel reads ───────────────────────────────────── # By snapshot ID df_snap = spark.read.option("snapshot-id", "5782341283").table("prod.analytics.events") # By timestamp df_ts = spark.read.option("as-of-timestamp", "1705276800000").table("prod.analytics.events") # Incremental read — changes between two snapshots df_incr = ( spark.read .format("iceberg") .option("start-snapshot-id", "5780000000") .option("end-snapshot-id", "5782341283") .load("prod.analytics.events") ) # ── 8. Schema evolution (no file rewrites!) ────────────────── spark.sql("ALTER TABLE prod.analytics.events ADD COLUMN session_id BIGINT") spark.sql("ALTER TABLE prod.analytics.events RENAME COLUMN country TO country_code") # Existing files return NULL for new columns — zero cost # ── 9. Partition evolution (V2+) ───────────────────────────── # Change partition without rewriting old data spark.sql(""" ALTER TABLE prod.analytics.events ADD PARTITION FIELD hour(occurred_at) -- add hourly on top of daily """) # New data is partitioned by day+hour; old data stays day-only # Query engine handles both specs transparently
Concurrency — Commit Conflicts & Retry Logic
""" Iceberg uses optimistic concurrency control (OCC): 1. Writer reads current metadata 2. Performs computation 3. Attempts commit (catalog CAS: old_metadata → new_metadata) 4. If another writer committed between steps 1 and 3 → CommitFailedException 5. Retry with exponential backoff Safe concurrent patterns: - Multiple appenders: always safe (different files) - Appender + overwriter: conflict if same partition - Two overwriters of same partition: one loses, must retry """ from pyiceberg.catalog import load_catalog from pyiceberg.exceptions import CommitFailedException import pyarrow as pa import time, random, logging log = logging.getLogger(__name__) def write_with_retry( catalog_name: str, table_id: str, data: pa.Table, overwrite: bool = False, max_retries: int = 5, base_delay: float = 0.5, ) -> None: """ Write PyArrow table to Iceberg with OCC retry loop. For Spark, set spark.sql.iceberg.handle-timestamp-without-timezone=true and use native Spark retry config instead. """ catalog = load_catalog(catalog_name) table = catalog.load_table(table_id) for attempt in range(max_retries + 1): try: if overwrite: table.overwrite(data) else: table.append(data) log.info("Committed to %s on attempt %d", table_id, attempt + 1) return except CommitFailedException as exc: if attempt == max_retries: raise RuntimeError(f"Commit failed after {max_retries} retries") from exc delay = base_delay * (2 ** attempt) + random.uniform(0, 0.5) log.warning("Commit conflict on attempt %d. Retrying in %.2fs", attempt + 1, delay) # Reload table to get latest metadata before reattempting table = catalog.load_table(table_id) time.sleep(delay)
Read Performance & Optimization
Iceberg read performance comes from aggressive file elimination at the metadata level — before a single data file is opened. Understanding this planning pipeline explains every tuning lever.
metadata.json → get current snapshot → manifest list path""" Read performance patterns — PyIceberg + PyArrow + DuckDB """ import pyarrow as pa import pyarrow.compute as pc import duckdb from pyiceberg.catalog import load_catalog from pyiceberg.expressions import ( EqualTo, GreaterThanOrEqual, LessThan, And, Or, Not, IsNull, NotNull ) catalog = load_catalog("prod") table = catalog.load_table("analytics.events") # ── 1. Scan with partition + column pruning ────────────────── # Iceberg eliminates files at manifest level — far fewer S3 requests scan = table.scan( row_filter=And( GreaterThanOrEqual("occurred_at", "2024-01-01T00:00:00+00:00"), LessThan("occurred_at", "2024-02-01T00:00:00+00:00"), EqualTo("country_code", "US"), NotNull("revenue"), ), selected_fields=("user_id", "event_type", "revenue", "occurred_at"), # column pruning limit=1_000_000, ) # ── 2. Convert to Arrow — zero-copy when possible ──────────── arrow_table = scan.to_arrow() print(f"Rows: {len(arrow_table):,} Cols: {len(arrow_table.schema)}") # ── 3. Query via DuckDB — best for ad-hoc analytics ───────── # DuckDB can directly query Iceberg via the PyArrow table conn = duckdb.connect() conn.register("events", arrow_table) result = conn.execute(""" SELECT event_type, COUNT(*) AS event_count, SUM(revenue) AS total_revenue, AVG(revenue) AS avg_revenue FROM events WHERE event_type != 'page_view' GROUP BY event_type ORDER BY total_revenue DESC """).arrow() # ── 4. Plan inspection — see what files Iceberg will read ──── print("\n=== SCAN PLAN ===") for task in scan.plan_files(): print(f" file={task.file.file_path.split('/')[-1]}" f" size={task.file.file_size_in_bytes/1_048_576:.1f}MiB" f" records={task.file.record_count:,}" f" delete_files={len(task.delete_files)}") # ── 5. Spark: tune scan parallelism ───────────────────────── # SPARK SQL — tune these for large scans # spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true") # spark.conf.set("spark.sql.iceberg.split-size", str(128 * 1024 * 1024)) # 128 MiB # spark.conf.set("spark.sql.iceberg.split-open-file-cost", str(4 * 1024 * 1024)) # spark.conf.set("spark.sql.iceberg.split-lookback", "10") # df = spark.table("prod.analytics.events") # .filter("occurred_at >= '2024-01-01'") # .hint("iceberg.scan.filter-pushdown", True) # always on in modern versions # ── 6. Streaming read from Iceberg (Spark Structured Streaming) # df_stream = ( # spark.readStream # .format("iceberg") # .option("stream-from-timestamp", "1704067200000") # epoch ms # .option("streaming-max-files-per-trigger", 100) # throttle # .load("prod.analytics.events") # ) # df_stream.writeStream.format("console").start().awaitTermination()
Hidden Partitioning — Why It Matters
-- ── HIVE partitioning (brittle, user must know partition columns) ──
-- User MUST specify partition column in query, or full scan happens:
SELECT * FROM hive_events WHERE dt = '2024-01-15' -- works
SELECT * FROM hive_events WHERE event_time > '2024-01-15' -- FULL TABLE SCAN!
-- ── ICEBERG hidden partitioning ─────────────────────────────────
-- Table defined with PARTITIONED BY (days(occurred_at))
-- Iceberg computes partition value internally — user queries raw column:
SELECT * FROM prod.analytics.events
WHERE occurred_at >= TIMESTAMP '2024-01-15 00:00:00 UTC' -- auto-pruned!
AND occurred_at < TIMESTAMP '2024-01-16 00:00:00 UTC'
-- Iceberg translates this to partition filter: day = 19737 (days since epoch)
-- Zero Hive-style partition management in user queries
-- ── Transform options ────────────────────────────────────────────
-- years(ts) → partition by year
-- months(ts) → partition by month
-- days(ts) → partition by day
-- hours(ts) → partition by hour
-- bucket(N, col) → hash bucket into N buckets (for high-cardinality joins)
-- truncate(W, col) → truncate string/int to W chars/digits
-- Multiple transforms in one spec (V2+):
-- PARTITIONED BY (days(occurred_at), bucket(16, user_id))
-- → good for time-range + user_id join patterns
Table Maintenance — Non-Negotiable Operations
""" Production maintenance job — runs via Airflow / cron. Cover ALL four maintenance procedures — skip any one and you'll regret it. """ from pyspark.sql import SparkSession import logging, argparse log = logging.getLogger(__name__) spark = SparkSession.builder.appName("iceberg-maintenance").getOrCreate() def compact_data_files(table_id: str, strategy: str = "binpack"): """ Procedure 1: rewriteDataFiles Merges small files into target-size files. Strategies: 'binpack' (default), 'sort', 'zorder' """ log.info("[COMPACT] Starting %s for %s", strategy, table_id) if strategy == "binpack": result = spark.sql(f""" CALL prod.system.rewrite_data_files( table => '{table_id}', strategy => 'binpack', options => map( 'target-file-size-bytes', '134217728', -- 128 MiB 'min-file-size-bytes', '33554432', -- 32 MiB: only compact files < this 'max-file-size-bytes', '214748364', -- 200 MiB: only compact files > this 'min-input-files', '5', -- skip partition if < 5 small files 'rewrite-all', 'false', -- don't rewrite healthy files 'partial-progress.enabled', 'true', -- commit in batches, not all-or-nothing 'partial-progress.max-commits', '10', 'max-concurrent-file-group-rewrites', '100' ) ) """) elif strategy == "sort": # Sort by commonly filtered columns for better min/max pruning result = spark.sql(f""" CALL prod.system.rewrite_data_files( table => '{table_id}', strategy => 'sort', sort_order => 'zorder(occurred_at, user_id)', options => map( 'target-file-size-bytes', '134217728', 'max-concurrent-file-group-rewrites', '50' ) ) """) row = result.first() log.info("[COMPACT] Done: %d files → %d files, %d bytes rewritten", row.rewritten_files_count, row.added_files_count, row.rewritten_bytes_count) return row def compact_delete_files(table_id: str): """ Procedure 2: rewritePositionDeleteFiles Merges many small positional delete files into fewer larger ones. Only relevant for MOR tables. Run before or after data file compaction. """ log.info("[DELETE-COMPACT] Starting for %s", table_id) result = spark.sql(f""" CALL prod.system.rewrite_position_delete_files( table => '{table_id}', options => map( 'target-file-size-bytes', '67108864', -- 64 MiB for delete files 'rewrite-all', 'false' ) ) """) log.info("[DELETE-COMPACT] Done: %s", result.first()) def expire_snapshots(table_id: str, retain_days: int = 7): """ Procedure 3: expireSnapshots Removes old snapshot metadata and marks unreferenced data files for deletion. Does NOT delete files — removeOrphanFiles does that. """ log.info("[EXPIRE] Expiring snapshots older than %d days for %s", retain_days, table_id) result = spark.sql(f""" CALL prod.system.expire_snapshots( table => '{table_id}', older_than => TIMESTAMPADD(DAY, -{retain_days}, CURRENT_TIMESTAMP), retain_last => 5, -- always keep at least 5 snapshots stream_results => true -- don't collect all results in driver memory ) """) log.info("[EXPIRE] Done: %s", result.first()) def remove_orphan_files(table_id: str, older_than_days: int = 3): """ Procedure 4: removeOrphanFiles Physically deletes files on storage that are not referenced by any current metadata. Safe buffer: always older_than >= snapshot retention. WARNING: set dry_run=true first to inspect what would be deleted! """ log.info("[ORPHAN] Scanning for orphan files older than %dd in %s", older_than_days, table_id) result = spark.sql(f""" CALL prod.system.remove_orphan_files( table => '{table_id}', older_than => TIMESTAMPADD(DAY, -{older_than_days}, CURRENT_TIMESTAMP), dry_run => false -- set to true in staging to preview ) """) deleted = result.count() log.info("[ORPHAN] Deleted %d orphan files", deleted) def run_full_maintenance(table_id: str, is_mor: bool = False): """ Full maintenance pipeline — order matters! 1. Compact deletes (if MOR) — must run before data compaction 2. Compact data files — merge small files 3. Expire snapshots — release old metadata references 4. Remove orphan files — physically delete unreferenced files """ if is_mor: compact_delete_files(table_id) compact_data_files(table_id, strategy="binpack") expire_snapshots(table_id, retain_days=7) remove_orphan_files(table_id, older_than_days=3) # must be > retain window log.info("[MAINTENANCE] Complete for %s", table_id)
Maintenance Health Monitoring
""" Iceberg table health metrics — run periodically and alert on thresholds. All queries via PyIceberg metadata inspection (no Spark needed). """ from pyiceberg.catalog import load_catalog from dataclasses import dataclass from typing import Optional import statistics @dataclass class TableHealthReport: table_id: str total_data_files: int total_delete_files: int avg_data_file_size_mb: float small_files_count: int # files < 32 MiB snapshots_count: int oldest_snapshot_days: float delete_to_data_ratio: float # red flag if > 0.5 total_size_gb: float warnings: list[str] critical: list[str] def inspect_table_health(catalog_name: str, table_id: str) -> TableHealthReport: import datetime catalog = load_catalog(catalog_name) table = catalog.load_table(table_id) snap = table.current_snapshot() warnings, critical = [], [] if not snap: return TableHealthReport(table_id=table_id, total_data_files=0, total_delete_files=0, avg_data_file_size_mb=0, small_files_count=0, snapshots_count=0, oldest_snapshot_days=0, delete_to_data_ratio=0, total_size_gb=0, warnings=["Empty table"], critical=[]) s = snap.summary total_data = int(s.get('total-data-files', 0)) total_deletes = int(s.get('total-delete-files', 0)) total_size = int(s.get('total-files-size', 0)) avg_size = (total_size / total_data / 1_048_576) if total_data > 0 else 0 # Inspect individual manifest entries for file-level stats file_sizes, small_count = [], 0 for manifest in table.current_snapshot().manifests(table.io): for entry in manifest.fetch_manifest_entry(table.io): sz = entry.data_file.file_size_in_bytes file_sizes.append(sz) if sz < 32 * 1_048_576: small_count += 1 snap_count = len(table.metadata.snapshots) oldest_ts = min(s.timestamp_ms for s in table.metadata.snapshots) oldest_days = (datetime.datetime.now().timestamp() * 1000 - oldest_ts) / (86400 * 1000) del_ratio = total_deletes / total_data if total_data > 0 else 0 # Evaluate health thresholds if avg_size < 32: warnings.append(f"Avg file size {avg_size:.0f} MiB < 32 MiB — run compaction") if avg_size > 512: warnings.append(f"Avg file size {avg_size:.0f} MiB > 512 MiB — may hurt parallelism") if del_ratio > 0.5: critical.append(f"Delete/data file ratio {del_ratio:.2f} > 0.5 — MOR reads degraded") if del_ratio > 0.2: warnings.append(f"Delete/data file ratio {del_ratio:.2f} — compact delete files") if snap_count > 100: warnings.append(f"{snap_count} snapshots — run expireSnapshots") if oldest_days > 14: critical.append(f"Oldest snapshot {oldest_days:.0f} days old — storage bloat risk") if small_count > total_data * 0.3: warnings.append(f"{small_count}/{total_data} files < 32 MiB — small file problem") return TableHealthReport( table_id=table_id, total_data_files=total_data, total_delete_files=total_deletes, avg_data_file_size_mb=avg_size, small_files_count=small_count, snapshots_count=snap_count, oldest_snapshot_days=oldest_days, delete_to_data_ratio=del_ratio, total_size_gb=total_size / 1_073_741_824, warnings=warnings, critical=critical, )
Engine Setup & Compatibility
from pyspark.sql import SparkSession # Iceberg JAR — check https://iceberg.apache.org/releases/ for latest ICEBERG_VERSION = "1.6.1" SPARK_VERSION = "3.5" spark = ( SparkSession.builder .appName("iceberg-production") # ── Extensions ───────────────────────────────────────────── .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # ── Catalog config ───────────────────────────────────────── .config("spark.sql.catalog.prod", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.prod.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") .config("spark.sql.catalog.prod.warehouse", "s3://my-lake/warehouse") .config("spark.sql.catalog.prod.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .config("spark.sql.catalog.prod.s3.region", "us-east-1") # ── S3 performance ───────────────────────────────────────── .config("spark.sql.catalog.prod.s3.multipart-enabled", "true") .config("spark.sql.catalog.prod.s3.multipart-size", "67108864") # 64MiB .config("spark.sql.catalog.prod.s3.upload-thread-count", "10") # ── Write defaults ───────────────────────────────────────── .config("spark.sql.iceberg.check-nullability", "false") # relax for migration .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true") .config("spark.sql.defaultCatalog", "prod") # ── Vectorized reads (Parquet) ───────────────────────────── .config("spark.sql.parquet.enableVectorizedReader", "true") .config("spark.sql.parquet.filterPushdown", "true") # ── Packages (submit via spark-submit --packages instead) ── # f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION}" # f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}" .getOrCreate() )
-- /etc/trino/catalog/iceberg.properties connector.name=iceberg hive.metastore=glue hive.metastore.glue.region=us-east-1 iceberg.file-format=PARQUET iceberg.compression-codec=ZSTD iceberg.target-max-file-size=134217728 iceberg.unique-table-location=true iceberg.dynamic-filtering.wait-timeout=30s -- Native splits for better parallelism (Trino 430+): iceberg.split-manager-threads=32 iceberg.max-partitions-per-scan=200 -- For Nessie REST catalog instead of Glue: -- connector.name=iceberg -- iceberg.catalog.type=rest -- iceberg.rest-catalog.uri=http://nessie.data.internal:19120/iceberg -- iceberg.rest-catalog.warehouse=prod_warehouse -- Query examples in Trino: SHOW SCHEMAS FROM iceberg; SHOW TABLES FROM iceberg.analytics; -- Time travel in Trino: SELECT * FROM iceberg.analytics.events FOR VERSION AS OF 5782341283; -- by snapshot ID SELECT * FROM iceberg.analytics.events FOR TIMESTAMP AS OF TIMESTAMP '2024-01-15 12:00:00 UTC'; -- Inspect table snapshots: SELECT snapshot_id, committed_at, operation, summary FROM iceberg.analytics."events$snapshots" ORDER BY committed_at DESC LIMIT 20; -- Inspect files for the current snapshot: SELECT file_path, file_size_in_bytes, record_count, column_sizes FROM iceberg.analytics."events$files" ORDER BY file_size_in_bytes DESC LIMIT 50;
""" Flink → Iceberg streaming sink. Flink is the preferred engine for low-latency CDC ingestion into Iceberg tables. Writes are transactionally committed at checkpoint intervals. """ from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import EnvironmentSettings, TableEnvironment env = StreamExecutionEnvironment.get_execution_environment() env.enable_checkpointing(60_000) # 60s checkpoints = 60s commit latency env.get_checkpoint_config().set_min_pause_between_checkpoints(30_000) env.get_checkpoint_config().set_checkpoint_timeout(120_000) t_env = TableEnvironment.create(EnvironmentSettings.new_instance().build()) # Configure Iceberg catalog in Flink SQL t_env.execute_sql(""" CREATE CATALOG iceberg_catalog WITH ( 'type' = 'iceberg', 'catalog-type' = 'glue', 'warehouse' = 's3://my-lake/warehouse', 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO', 'glue.region' = 'us-east-1' ) """) t_env.execute_sql("USE CATALOG iceberg_catalog") t_env.execute_sql("USE analytics") # Kafka source → Iceberg sink via SQL (simplest pattern) t_env.execute_sql(""" CREATE TEMPORARY TABLE kafka_events ( event_id STRING, user_id BIGINT, event_type STRING, occurred_at TIMESTAMP(6) WITH LOCAL TIME ZONE, revenue DOUBLE, country_code STRING, proc_time AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'prod.events', 'properties.bootstrap.servers' = 'kafka:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """) # Insert into Iceberg — commits at each Flink checkpoint t_env.execute_sql(""" INSERT INTO iceberg_catalog.analytics.events SELECT event_id, user_id, event_type, occurred_at, revenue, country_code, FALSE AS is_bot -- default; updated via batch job FROM kafka_events """)
Operational Reality & Known Risks
Migration from Hive to Iceberg
""" Migrate existing Hive table to Iceberg format IN-PLACE — no data copy. This registers the existing Parquet files under Iceberg metadata. Critical: the source table must be external (not managed) Hive table. """ from pyspark.sql import SparkSession spark = SparkSession.builder # ... (full config from engines tab).getOrCreate() # ── Option A: SHADOW MIGRATION (safest — new Iceberg table, copy data) # No downtime, but requires double storage during transition spark.sql(""" CREATE TABLE prod.analytics.events_iceberg USING iceberg AS SELECT * FROM hive.analytics.events """) # Validate, then atomic rename via catalog API # ── Option B: IN-PLACE via MIGRATE PROCEDURE (Spark only, no data copy) # Registers existing Parquet files under Iceberg metadata # WARNING: Reads continue to work during migration, writes MUST stop spark.sql(""" CALL prod.system.migrate( table => 'hive.analytics.events', properties => map( 'format-version', '2', 'write.format.default', 'parquet', 'write.parquet.compression-codec', 'zstd' ) ) """) # ── Post-migration validation ──────────────────────────────── hive_count = spark.sql("SELECT COUNT(*) FROM hive.analytics.events").first()[0] iceberg_count = spark.sql("SELECT COUNT(*) FROM prod.analytics.events").first()[0] assert hive_count == iceberg_count, ( f"Row count mismatch! Hive={hive_count}, Iceberg={iceberg_count}" ) print(f"Migration validated: {iceberg_count:,} rows confirmed") # ── Schema snapshot before/after for diff ─────────────────── from pyiceberg.catalog import load_catalog catalog = load_catalog("prod") table = catalog.load_table("analytics.events") print("Format version:", table.metadata.format_version) print("Partition spec:", table.spec())
Known Gotchas & Anti-Patterns
Streaming ingestion writes one file per checkpoint per partition. With 100 partitions and 60s checkpoints, you generate 100 files/minute. Without compaction: 144,000 files/day. S3 LIST operations become the bottleneck, not read I/O. Fix: compact daily minimum, hourly for high-volume tables.
Partitioning by user_id (millions of unique values) creates millions of partition directories. Metadata operations (manifest scans, ListObjects) become catastrophically slow. Fix: use bucket(N, user_id) transform. N = 16–256 depending on scale.
Hive Metastore uses database-level locking for Iceberg commits. Two concurrent Spark jobs writing the same table may deadlock or corrupt metadata. Fix: migrate to Glue, Nessie, or REST catalog. These use OCC (optimistic locking) and handle concurrency correctly.
You must run expireSnapshots with older_than buffer ≥ the gap between your last writer's completion and when removeOrphanFiles runs. If a writer is still writing files that haven't been committed to a snapshot yet, removeOrphanFiles may delete them. The buffer should be at least 24 hours in practice.
A V1 engine (old Spark Iceberg connector) cannot read a V2 table with equality deletes — it will either error or silently return wrong results (rows that should be deleted are returned). Pin connector versions per table spec version and enforce via CI checks on connector upgrades.
After 30 days of CDC without compaction, a MOR table may have 10 delete files per data file. Every read must merge 11 files instead of 1. Read latency increases 5–20×. Storage doubles (delete files are not small). Without an SLA on delete file ratio, production SLAs will be missed silently.
Production Runbook Checklist
- Catalog first. Choose and deploy catalog before creating any tables. Changing catalog type later is painful and requires migration.
- Spec format-version = 2 for all new tables. V1 cannot have row-level deletes — you'll need to recreate the table to add CDC support later.
- Set
write.target-file-size-bytes = 134217728(128 MiB) explicitly. Default varies by engine — don't rely on it. - Configure
commit.retry.num-retries = 4at minimum. Transient S3/catalog errors are common; no retries means unnecessary failures. - Schedule compaction as a recurring job within 24h of first write. Do not wait for performance to degrade to set up maintenance.
- Monitor
total-delete-filesper table. Alert when delete-to-data-file ratio exceeds 0.2. - Monitor average data file size. Alert if avg drops below 32 MiB (small file problem) or exceeds 512 MiB (parallelism problem).
- Set snapshot retention policy before table is written to.
history.expire.max-snapshot-age-msin table properties. - Test cross-engine reads in CI before promoting to production. Spark-written Parquet + Trino reads + delete file handling all have subtle compatibility edge cases.
- Tag tables with owner, SLA, and compaction strategy metadata. Iceberg table properties are free-form — use them for operational metadata.
- Never use
removeOrphanFileson a table with active writers without a 24h buffer. In-flight writes are not yet in metadata and will be treated as orphans. - Geospatial and Variant types require V3 — verify engine support before enabling. Trino and Spark have incomplete V3 support as of 2025.