# Every Kafka message has exactly this structure: ProducerRecord( topic = 'orders', # destination topic partition = None, # None = hash(key) % num_partitions key = b'customer-uuid', # bytes — routing + ordering unit value = b'{"id":...}', # bytes — your payload headers = [ # metadata KV pairs ('X-Source', b'checkout-svc'), ('X-Trace-ID', b'abc-123-def'), ('Content-Type',b'application/json'), ], timestamp = None # None = broker-assigned (CreateTime) ) # KEY INSIGHT: Key determines partition assignment. # Same key → same partition → guaranteed ordering for that key. # No key → round-robin across all partitions. # Null key after sticky partitioning (batching) completes.
# server.properties — combined broker+controller node process.roles=broker,controller # this node does both node.id=1 controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093 listeners=PLAINTEXT://:9092,CONTROLLER://:9093 inter.broker.listener.name=PLAINTEXT controller.listener.names=CONTROLLER # Generate cluster UUID (once, before first start): # kafka-storage.sh format --config server.properties \ # --cluster-id $(kafka-storage.sh random-uuid) # What KRaft eliminates: # ✓ ZooKeeper cluster (3-5 extra VMs) # ✓ ZK latency on metadata changes # ✓ Split-brain between ZK and Kafka state # ✓ 2-phase shutdown/startup coordination # New limit: 1M+ partitions (ZK limit was ~200K)
min.insync.replicas ≥ 2. Combined with idempotent producer: exactly-once within a partition. Use for everything that matters.acks=all is meaningless without min.insync.replicas=2 on the topic. With RF=3 and min.insync.replicas=1 (the default), one broker = full ISR and acks=all behaves like acks=1. Set this at the broker AND topic level.
from confluent_kafka import Producer p = Producer({ # Durability 'acks': 'all', 'enable.idempotence': True, # requires acks=all + retries>0 # Throughput vs Latency tradeoff 'linger.ms': 20, # wait 20ms to batch more msgs 'batch.size': 1048576, # 1MB batch (default 16KB) 'buffer.memory': 67108864,# 64MB producer buffer # Compression (huge throughput win) 'compression.type': 'zstd', # best ratio; lz4 for low latency # Retry behavior 'retries': 2147483647, # max retries (idempotent = safe) 'retry.backoff.ms': 100, 'delivery.timeout.ms': 120000, # 2min max for any one message 'request.timeout.ms': 30000, # Ordering guarantee within partition 'max.in.flight.requests.per.connection': 5, # With idempotence: safe up to 5 in-flight. # Without idempotence: must be 1 to prevent reordering on retry. 'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092', }) # Async produce with delivery callback def on_delivery(err, msg): if err: log_error(err) else: metrics.increment('produced') p.produce( topic = 'orders', key = order.customer_id.encode(), value = order.to_json().encode(), headers = {'source': b'checkout'}, callback = on_delivery ) p.poll(0) # trigger callbacks without blocking # Flush before shutdown — critical! p.flush(timeout=30) # wait up to 30s for all outstanding messages
""" KEY SELECTION RULES: 1. Same key → same partition → ordering guaranteed 2. No key → round-robin (or sticky batch) 3. High-cardinality keys = even distribution 4. Low-cardinality keys = hot partition risk """ # ✅ GOOD: customer_id as key # Orders per customer are ordered # Millions of customers → even distribution p.produce(topic='orders', key=order.customer_id.encode(), value=v) # ✅ GOOD: composite key for finer ordering key = f"{customer_id}:{order_id}".encode() # ❌ BAD: country_code as key # Only ~200 unique values → 200 partitions max # "US" gets 40% of traffic → hot partition # ❌ BAD: timestamp as key # New partition every millisecond if hash collides # No ordering guarantee you actually need # ❌ BAD: null key for ordered events # Round-robin destroys per-entity ordering ───────────────────────────────────────────────── # Custom partitioner (Python) # Useful for: geographic routing, tenant isolation def geography_partitioner(key_bytes, all_partitions, available): region_map = {b'us': 0, b'eu': 1, b'ap': 2} region = key_bytes[:2] if region in region_map: idx = region_map[region] % len(all_partitions) return all_partitions[idx] # default: hash return all_partitions[hash(key_bytes) % len(all_partitions)]
# Transactional API: write to multiple topics atomically. # Either ALL messages commit or NONE do. # Essential for: read-process-write pipelines (stream processing). p = Producer({ 'bootstrap.servers': 'broker:9092', 'transactional.id': 'checkout-producer-1', # unique per instance 'enable.idempotence': True, }) p.init_transactions() try: p.begin_transaction() # Write to multiple topics in one atomic batch: p.produce('orders', key=k, value=order_msg) p.produce('inventory', key=k, value=reserve_msg) p.produce('notifications', key=k, value=email_msg) # Atomically commit consumer offsets WITH the transaction: p.send_offsets_to_transaction( consumer.position(partitions), consumer.consumer_group_metadata() ) p.commit_transaction() except KafkaException as e: p.abort_transaction() # all three writes rolled back raise
from confluent_kafka import Consumer, KafkaError import signal, sys c = Consumer({ 'bootstrap.servers': 'broker1:9092', 'group.id': 'order-processor-v3', 'auto.offset.reset': 'earliest', # 'latest' for new groups # CRITICAL: disable auto-commit 'enable.auto.commit': False, # Auto-commit commits on a timer, NOT after processing. # You can commit before processing → message loss on crash. # Always commit manually after confirmed processing. # Session & heartbeat 'session.timeout.ms': 45000, # 45s — time before assumed dead 'heartbeat.interval.ms': 3000, # 1/3 of session.timeout 'max.poll.interval.ms': 300000, # 5min — max time between polls # Fetch tuning 'fetch.min.bytes': 1024, # wait for 1KB before returning 'fetch.max.wait.ms': 500, # max wait for fetch.min.bytes 'max.partition.fetch.bytes':1048576, # 1MB per partition per fetch 'max.poll.records': 500, # max messages per poll() }) running = True signal.signal(signal.SIGTERM, lambda *_: globals().update(running=False)) c.subscribe(['orders'], on_assign=on_assign, on_revoke=on_revoke) try: while running: msgs = c.consume(num_messages=500, timeout=1.0) if not msgs: continue for msg in msgs: if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue # end of partition, not an error raise KafkaException(msg.error()) process(msg) # ← your business logic # Commit AFTER processing the batch — at-least-once c.commit(asynchronous=False) # sync commit = no gaps finally: c.close() # triggers graceful rebalance, commits offsets
max.poll.interval.ms), GC pauses, DB queries in the hot path, or too many consumers for too few partitions. Symptoms: lag spikes, duplicate processing, offset commit failures.
from confluent_kafka import Consumer # Static group membership — eliminates rebalance on restart c = Consumer({ 'group.id': 'order-processor', 'group.instance.id': 'processor-us-east-1a', # STATIC 'session.timeout.ms': 300000, # 5min — safe rolling restarts # Static member holds its partitions for up to session.timeout # after disconnect. Rolling deploys trigger zero rebalances. }) def on_assign(consumer, partitions): # Called when partitions are assigned — good place to: # - Seek to a custom offset # - Reset a per-partition state machine # - Load per-partition warm cache from DB for p in partitions: log.info(f"Assigned partition {p.partition}") consumer.assign(partitions) def on_revoke(consumer, partitions): # Called BEFORE partitions are taken away. # MUST commit all pending offsets here — your last chance. consumer.commit(asynchronous=False) for p in partitions: flush_partition_state(p.partition) ───────────────────────────────────────────────── # Partition assignment strategies (broker-side) # partition.assignment.strategy=cooperative-sticky (Kafka 2.4+) # cooperative-sticky: incremental rebalance — only moves # partitions that need to move. Other partitions keep consuming. # vs range/roundrobin: STOP-THE-WORLD — all partitions revoked. # Always set cooperative-sticky in new deployments: 'partition.assignment.strategy': 'cooperative-sticky'
from confluent_kafka import Consumer, TopicPartition c = Consumer({'group.id': 'monitor', 'bootstrap.servers': '...'}) # Get consumer group lag for a topic: md = c.list_topics('orders') partitions = [ TopicPartition('orders', p) for p in md.topics['orders'].partitions ] committed = c.committed(partitions) high_water = c.get_watermark_offsets for tp in committed: _, hw = c.get_watermark_offsets(tp) lag = hw - (tp.offset if tp.offset >= 0 else 0) print(f"Partition {tp.partition}: lag={lag}") # Seek to specific offset (replay from point in time): c.seek(TopicPartition('orders', partition=0, offset=12345)) # Seek to timestamp (replay from a specific time): ts_ms = 1704067200000 # 2024-01-01 00:00:00 UTC in ms offsets_for_ts = c.offsets_for_times([TopicPartition('orders', 0, ts_ms)]) c.seek(offsets_for_ts[0])
StreamsBuilder builder = new StreamsBuilder(); // KStream: unbounded stream of events KStream<String, Order> orders = builder.stream("orders", Consumed.with( Serdes.String(), orderSerde )); // Filter + branch KStream<String, Order>[] branches = orders .filter((k, v) -> v.getAmount() > 0) .branch( (k, v) -> v.getStatus().equals("COMPLETED"), (k, v) -> v.getStatus().equals("REFUNDED"), (k, v) -> true // catch-all ); KStream completed = branches[0]; KStream refunded = branches[1]; // Stateful: aggregate revenue per customer per hour KTable<Windowed<String>, Double> hourlyRevenue = completed .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace( Duration.ofHours(1) )) .aggregate( () -> 0.0, (key, order, agg) -> agg + order.getAmount(), Materialized.as("hourly-revenue-store") ); // Write result back to Kafka hourlyRevenue .toStream() .to("hourly-revenue"); // Start topology KafkaStreams app = new KafkaStreams( builder.build(), streamsConfig ); app.start();
| Abstraction | Represents | Use When |
|---|---|---|
| KStream | Infinite stream of events (each = new fact) | Clickstream, logs, transactions |
| KTable | Changelog stream (latest value per key) | User profiles, account balances |
| GlobalKTable | KTable replicated to ALL instances | Small lookup tables (countries, SKUs) |
// KStream-KTable join: enrich each order with customer profile KTable<String, Customer> customers = builder.table("customers"); // latest value per customer_id KStream<String, EnrichedOrder> enriched = orders.join( customers, (order, customer) -> new EnrichedOrder(order, customer) ); // Note: both must be co-partitioned (same # of partitions, same key) ───────────────────────────────────────────────── // WINDOWING TYPES: // Tumbling: fixed-size, non-overlapping // Hopping: fixed-size, overlapping (window can advance by hop) // Session: activity-gap based (closes after N ms of inactivity) // Sliding: every event creates a window ending at that event // Session window — user activity sessions orders.groupByKey() .windowedBy(SessionWindows .ofInactivityGapWithNoGrace(Duration.ofMinutes(30)) ) .count(); // events per session per user // Interactive query — read state store from ANY instance: ReadOnlyKeyValueStore<String, Long> store = streams.store(StoreQueryParameters.fromNameAndType( "hourly-revenue-store", QueryableStoreTypes.keyValueStore() )); Long revenue = store.get("customer-123");
.ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(5)) to accept late arrivals up to 5 minutes late. Critical for Kafka→S3→Kafka pipelines where end-to-end latency varies.
{
"name": "postgres-orders-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput", // no plugin needed in PG14+
"database.hostname": "prod-db.internal",
"database.port": "5432",
"database.user": "debezium_ro",
"database.password": "${file:/secrets/db.properties:password}",
"database.dbname": "ecommerce",
"database.server.name": "ecommerce-prod", // topic prefix
// Topic: ecommerce-prod.public.orders
"table.include.list": "public.orders,public.customers,public.inventory",
"column.exclude.list": "public.customers.ssn,public.customers.ccnum",
// Snapshot mode: initial | never | when_needed
"snapshot.mode": "initial", // full table scan on first start
"snapshot.isolation.mode": "repeatable_read",
// Outbox pattern — for transactional outbox
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.fields.additional.placement": "type:header:eventType"
}
}
{
"name": "s3-sink-orders",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "8", // parallelism
"topics": "orders,inventory",
"s3.region": "us-east-1",
"s3.bucket.name": "my-data-lake",
"s3.part.size": "67108864", // 64MB part uploads
// Hive-compatible partitioning: year=2024/month=01/day=15/
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en-US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "event_ts",
// Format: Parquet with schema evolution
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "zstd",
"schema.compatibility": "FULL_TRANSITIVE",
// Flush every 10min OR 50k records OR 100MB
"rotate.interval.ms": "600000",
"flush.size": "50000"
}
}
"transforms": "route,flatten,mask,addfield,timestamp", // 1. Route by field value → different topics "transforms.route.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.route.fields": "region", // 2. Flatten nested struct → flat key=value "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value", "transforms.flatten.delimiter": "_", // 3. Mask PII fields with a fixed value "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.mask.fields": "ssn,credit_card,email", "transforms.mask.replacement": "[REDACTED]", // 4. Add a static metadata field "transforms.addfield.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.addfield.static.field": "_source", "transforms.addfield.static.value": "postgres-prod", // 5. Convert timestamp format "transforms.timestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.timestamp.field": "created_at", "transforms.timestamp.target.type": "unix"
from confluent_kafka import Producer from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer # Schema defined once in the registry ORDER_SCHEMA_STR = """ { "type": "record", "name": "Order", "namespace": "com.myco.ecommerce", "fields": [ {"name": "order_id", "type": "string"}, {"name": "customer_id", "type": "string"}, {"name": "amount_cents", "type": "long"}, {"name": "status", "type": {"type": "enum", "name": "OrderStatus", "symbols": ["PENDING","PROCESSING","COMPLETED","CANCELLED"]}}, {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}, {"name": "metadata", "type": ["null", { "type": "record", "name": "Meta", "fields": [ {"name": "ip", "type": "string"}, {"name": "ua", "type": ["null","string"], "default": null} ] }], "default": null} ] } """ sr_client = SchemaRegistryClient({'url': 'https://sr.internal:8081'}) avro_serializer = AvroSerializer(sr_client, ORDER_SCHEMA_STR, conf={'auto.register.schemas': True} ) # Wire format: [0x00][4-byte schema ID][avro bytes] # Schema ID is looked up in registry by consumer. # Payload is 5–50× smaller than JSON for the same data. p = Producer({'bootstrap.servers': 'broker:9092'}) p.produce( topic='orders', key=order['customer_id'].encode(), value=avro_serializer(order, SerializationContext('orders', MessageField.VALUE)) )
// ALLOWED with BACKWARD compatibility:
// ✅ Add field with default: {"name":"region","type":"string","default":"US"}
// ✅ Remove field that had a default in prior schema
// ❌ Remove field with no default (old consumers break)
// ❌ Change field type (int → string)
// ❌ Rename field (it's a delete + add)
// SAFE RENAME PATTERN: use aliases
{
"name": "customer_identifier", // new name
"aliases": ["customer_id"], // old name — Avro maps it
"type": "string"
}
// Protobuf advantages over Avro for evolution:
// - Field numbers never change (even if names do)
// - Adding any field is always backward compatible
// - Required → optional is always safe
// - Better for gRPC-native stacks
""" Idempotent Producer Internals: 1. Broker assigns Producer ID (PID) on first connect 2. Producer tags each message: (PID, partition, sequence_num) 3. Broker tracks latest sequence per (PID, partition) 4. Duplicate: seq ≤ last seen → broker ACKs but does NOT write 5. Out-of-order: seq > expected → broker rejects (ProducerFencedException) This prevents duplicate writes on retry — the #1 cause of "at-least-once" overproduction on network errors. """ p = Producer({ 'enable.idempotence': True, 'acks': 'all', # required for idempotence 'retries': 2147483647, # max — idempotent = safe to retry 'max.in.flight.requests.per.connection': 5, # up to 5 with idempotence }) # Idempotence scope: per partition. # Does NOT prevent duplicates across partitions without transactions.
""" EOS Read-Process-Write: Consumer reads from topic A → processes record → writes result to topic B → commits input offset All four in one atomic transaction. """ consumer = Consumer({ 'group.id': 'eos-processor', 'enable.auto.commit': False, 'isolation.level': 'read_committed', # CRITICAL # read_committed: consumer skips aborted transactions # and messages beyond the last stable offset (LSO). # WITHOUT this: consumer reads partial/aborted transactions. }) producer = Producer({ 'transactional.id': 'eos-proc-instance-1', 'enable.idempotence': True, }) producer.init_transactions() consumer.subscribe(['raw-orders']) while True: msgs = consumer.consume(100, timeout=1.0) if not msgs: continue producer.begin_transaction() try: for msg in msgs: result = transform(msg) producer.produce('processed-orders', value=result) # Atomically commit offsets WITH the output write producer.send_offsets_to_transaction( consumer.position(consumer.assignment()), consumer.consumer_group_metadata() ) producer.commit_transaction() except Exception as e: producer.abort_transaction() # Re-seek to last committed position consumer.seek_to_committed() """ Why this is exactly-once: - Output write + offset commit = one atomic transaction - If process crashes mid-transaction: abort, re-seek, reprocess - If broker fails after commit: idempotent producer handles retry - Consumer with read_committed: never sees the aborted writes """
""" Partition count rules of thumb: 1. Throughput-based: partitions = max(producer_throughput, consumer_throughput) / per_partition_throughput Per partition: ~10MB/s write, ~50MB/s read (modern NVMe broker) 2. Consumer parallelism: Max parallelism = partition count 10 partitions → max 10 active consumers in a group 3. Leadership load: Each partition = 1 leader election on broker failure Each partition leader = ~1-5ms coordination overhead Recommendation: < 4000 partitions per broker (KRaft: much higher) 4. Replication factor (RF): RF=1 → no fault tolerance (dev only) RF=2 → can survive 1 broker failure (min for production) RF=3 → standard production — survives 2 simultaneous failures EXAMPLE: 100MB/s peak throughput, 4 consumers, RF=3 By throughput: 100/10 = 10 partitions minimum By parallelism: 4 consumers → 4 partitions minimum Add 2x headroom: 20 partitions Final: 24 (round to multiple of consumer count = 4) """ # Create topic with admin client from confluent_kafka.admin import AdminClient, NewTopic admin = AdminClient({'bootstrap.servers': 'broker:9092'}) admin.create_topics([ NewTopic( topic = 'orders', num_partitions = 24, replication_factor = 3, config = { 'retention.ms': '604800000', # 7 days 'retention.bytes': '53687091200', # 50GB per partition 'compression.type': 'zstd', 'min.insync.replicas':'2', 'cleanup.policy': 'delete', # or 'compact' for KTable 'message.timestamp.type': 'CreateTime', } ) ])
## NETWORK & THREADS num.network.threads=8 # socket threads: 2x CPU cores num.io.threads=16 # disk I/O threads: 4x CPU cores num.replica.fetchers=4 # replication fetch parallelism socket.send.buffer.bytes=1048576 # 1MB socket buffer socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 # 100MB max request ## LOG STORAGE log.dirs=/data/kafka-logs,/data2/kafka-logs # RAID-0 across SSDs log.segment.bytes=1073741824 # 1GB segments log.retention.hours=168 # 7 days log.flush.interval.messages=10000 # fsync every 10K messages log.flush.interval.ms=1000 # or every 1s — whichever first ## REPLICATION default.replication.factor=3 min.insync.replicas=2 replica.lag.time.max.ms=30000 # remove from ISR after 30s lag ## JVM (kafka-server-start.sh or KAFKA_HEAP_OPTS): ## KAFKA_HEAP_OPTS="-Xms6g -Xmx6g" ## -XX:+UseG1GC -XX:MaxGCPauseMillis=20 ## -XX:InitiatingHeapOccupancyPercent=35 ## -XX:+ExplicitGCInvokesConcurrent ## OS: sudo sysctl -w vm.swappiness=1 ## sudo sysctl -w net.core.rmem_max=134217728 ## sudo sysctl -w vm.max_map_count=262144 ## Set /proc/sys/fs/file-max to 1000000
# Alert when any partition lag > threshold from confluent_kafka import Consumer, TopicPartition def check_consumer_lag(group_id, topic, threshold=10000): c = Consumer({'bootstrap.servers':'broker:9092','group.id':'lag-monitor'}) tps = [TopicPartition(topic, p) for p in c.list_topics(topic).topics[topic].partitions] committed = c.committed(tps, timeout=5) alerts = [] for tp in committed: _, hw = c.get_watermark_offsets(tp, timeout=5) lag = hw - max(tp.offset, 0) if lag > threshold: alerts.append({'partition': tp.partition, 'lag': lag}) c.close() return alerts