LIVE STREAM
Architecture
THE DISTRIBUTED LOG
Kafka is not a message queue. It is an immutable, append-only, distributed commit log. Every event ever written is retained, replayable, and reprocessable by any number of consumers independently. This one design decision makes it fundamentally different from RabbitMQ, SQS, and every traditional broker.
10M+
Msgs/sec/broker
Consumer Replay
O(1)
Read Complexity
RPF
Replicated Partitioned Log
KRaft
No ZooKeeper (3.x+)
EOS
Exactly-Once

Anatomy of a Topic

TOPIC: orders — 3 partitions, RF=2, retention=7d
Partition 0
0
off
1
2
3
4
5 ←
LEO:6 | Broker 1 (Leader)
↳ replica
0
1
2
3
4
5
ISR | Broker 2
Partition 1
0
1
2
3 ←
LEO:4 | Broker 2 (Leader)
↳ replica
0
1
2
3
ISR | Broker 3
Partition 2
0
1
2
3
4
5
6 ←
LEO:7 | Broker 3 (Leader)
Leader ISR Replica HW (High Watermark) LEO = Log End Offset
High Watermark — The Visibility Contract Consumers can only read up to the High Watermark (HW) — the last offset replicated to ALL in-sync replicas (ISR). A message written to the leader but not yet replicated is invisible to consumers. This is not a bug — it's the consistency guarantee that prevents reading data that might be lost on leader failover.

The Full Cluster Topology

⚡ Producer App
──────────────────→
BROKER 1 (Controller)
↕ KRaft Replication
BROKER 2
BROKER 3
BROKER N
↓ Poll
Consumer A
Consumer B
Consumer C
↑ Same group = partitions split between them

Message Anatomy

ProducerRecord fieldsjava/python
# Every Kafka message has exactly this structure:
ProducerRecord(
  topic     = 'orders',         # destination topic
  partition = None,              # None = hash(key) % num_partitions
  key       = b'customer-uuid',  # bytes — routing + ordering unit
  value     = b'{"id":...}',     # bytes — your payload
  headers   = [                   # metadata KV pairs
    ('X-Source',    b'checkout-svc'),
    ('X-Trace-ID',  b'abc-123-def'),
    ('Content-Type',b'application/json'),
  ],
  timestamp = None               # None = broker-assigned (CreateTime)
)

# KEY INSIGHT: Key determines partition assignment.
# Same key → same partition → guaranteed ordering for that key.
# No key → round-robin across all partitions.
# Null key after sticky partitioning (batching) completes.

KRaft — Kafka Without ZooKeeper

KRaft mode (Kafka 3.3+ production-ready)properties
# server.properties — combined broker+controller node
process.roles=broker,controller         # this node does both
node.id=1
controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER

# Generate cluster UUID (once, before first start):
# kafka-storage.sh format --config server.properties \
#   --cluster-id $(kafka-storage.sh random-uuid)

# What KRaft eliminates:
# ✓ ZooKeeper cluster (3-5 extra VMs)
# ✓ ZK latency on metadata changes
# ✓ Split-brain between ZK and Kafka state
# ✓ 2-phase shutdown/startup coordination
# New limit: 1M+ partitions (ZK limit was ~200K)
Write Path
PRODUCERS — WRITE GUARANTEE MATRIX
The producer is deceptively complex. Every config choice is a tradeoff between throughput, latency, durability, and ordering. Most production outages trace back to misunderstood producer settings.

The Three Delivery Guarantees

acks=0
Fire and forget. Producer sends, doesn't wait for acknowledgement. Fastest throughput, zero durability. Data loss is guaranteed on any broker failure. Use only for: metrics, telemetry, logs where loss is acceptable.
AT-MOST-ONCE
acks=1
Leader acknowledges. Message written to leader log, ack sent. If leader dies before replication, message is lost. Default in many libraries — a dangerous default for financial data.
RISKY
acks=all
All ISR replicas acknowledge. Zero data loss assuming min.insync.replicas ≥ 2. Combined with idempotent producer: exactly-once within a partition. Use for everything that matters.
DURABLE
min.insync.replicas — The Hidden Config That Matters acks=all is meaningless without min.insync.replicas=2 on the topic. With RF=3 and min.insync.replicas=1 (the default), one broker = full ISR and acks=all behaves like acks=1. Set this at the broker AND topic level.

Batching & Throughput Tuning

High-throughput producer configpython
from confluent_kafka import Producer

p = Producer({
  # Durability
  'acks':                    'all',
  'enable.idempotence':      True,    # requires acks=all + retries>0

  # Throughput vs Latency tradeoff
  'linger.ms':              20,      # wait 20ms to batch more msgs
  'batch.size':             1048576, # 1MB batch (default 16KB)
  'buffer.memory':          67108864,# 64MB producer buffer

  # Compression (huge throughput win)
  'compression.type':       'zstd',  # best ratio; lz4 for low latency

  # Retry behavior
  'retries':                2147483647, # max retries (idempotent = safe)
  'retry.backoff.ms':       100,
  'delivery.timeout.ms':    120000, # 2min max for any one message
  'request.timeout.ms':     30000,

  # Ordering guarantee within partition
  'max.in.flight.requests.per.connection': 5,
  # With idempotence: safe up to 5 in-flight.
  # Without idempotence: must be 1 to prevent reordering on retry.

  'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
})

# Async produce with delivery callback
def on_delivery(err, msg):
  if err: log_error(err)
  else: metrics.increment('produced')

p.produce(
  topic    = 'orders',
  key      = order.customer_id.encode(),
  value    = order.to_json().encode(),
  headers  = {'source': b'checkout'},
  callback = on_delivery
)
p.poll(0)   # trigger callbacks without blocking

# Flush before shutdown — critical!
p.flush(timeout=30)  # wait up to 30s for all outstanding messages

Partitioning Strategy

Key selection — the most important decisionpython
"""
KEY SELECTION RULES:
  1. Same key → same partition → ordering guaranteed
  2. No key → round-robin (or sticky batch)
  3. High-cardinality keys = even distribution
  4. Low-cardinality keys = hot partition risk
"""

# ✅ GOOD: customer_id as key
#    Orders per customer are ordered
#    Millions of customers → even distribution
p.produce(topic='orders', key=order.customer_id.encode(), value=v)

# ✅ GOOD: composite key for finer ordering
key = f"{customer_id}:{order_id}".encode()

# ❌ BAD: country_code as key
#    Only ~200 unique values → 200 partitions max
#    "US" gets 40% of traffic → hot partition

# ❌ BAD: timestamp as key
#    New partition every millisecond if hash collides
#    No ordering guarantee you actually need

# ❌ BAD: null key for ordered events
#    Round-robin destroys per-entity ordering

─────────────────────────────────────────────────
# Custom partitioner (Python)
# Useful for: geographic routing, tenant isolation

def geography_partitioner(key_bytes, all_partitions, available):
    region_map = {b'us': 0, b'eu': 1, b'ap': 2}
    region = key_bytes[:2]
    if region in region_map:
        idx = region_map[region] % len(all_partitions)
        return all_partitions[idx]
    # default: hash
    return all_partitions[hash(key_bytes) % len(all_partitions)]

Transactional Producer

Atomic multi-topic writespython
# Transactional API: write to multiple topics atomically.
# Either ALL messages commit or NONE do.
# Essential for: read-process-write pipelines (stream processing).

p = Producer({
    'bootstrap.servers':    'broker:9092',
    'transactional.id':     'checkout-producer-1',  # unique per instance
    'enable.idempotence':   True,
})
p.init_transactions()

try:
    p.begin_transaction()

    # Write to multiple topics in one atomic batch:
    p.produce('orders',         key=k, value=order_msg)
    p.produce('inventory',      key=k, value=reserve_msg)
    p.produce('notifications',   key=k, value=email_msg)

    # Atomically commit consumer offsets WITH the transaction:
    p.send_offsets_to_transaction(
        consumer.position(partitions),
        consumer.consumer_group_metadata()
    )

    p.commit_transaction()

except KafkaException as e:
    p.abort_transaction()   # all three writes rolled back
    raise
Read Path
CONSUMERS — THE POLL LOOP
The consumer poll loop is the most misunderstood piece of the Kafka API. Get it wrong and you get rebalance storms, offset commit races, and ghost consumers that hold partitions without processing them.

The Correct Poll Loop

Production consumer — full patternpython
from confluent_kafka import Consumer, KafkaError
import signal, sys

c = Consumer({
    'bootstrap.servers':       'broker1:9092',
    'group.id':                'order-processor-v3',
    'auto.offset.reset':       'earliest',   # 'latest' for new groups

    # CRITICAL: disable auto-commit
    'enable.auto.commit':      False,
    # Auto-commit commits on a timer, NOT after processing.
    # You can commit before processing → message loss on crash.
    # Always commit manually after confirmed processing.

    # Session & heartbeat
    'session.timeout.ms':      45000,   # 45s — time before assumed dead
    'heartbeat.interval.ms':   3000,    # 1/3 of session.timeout
    'max.poll.interval.ms':    300000,  # 5min — max time between polls

    # Fetch tuning
    'fetch.min.bytes':         1024,    # wait for 1KB before returning
    'fetch.max.wait.ms':       500,     # max wait for fetch.min.bytes
    'max.partition.fetch.bytes':1048576, # 1MB per partition per fetch
    'max.poll.records':        500,     # max messages per poll()
})

running = True
signal.signal(signal.SIGTERM, lambda *_: globals().update(running=False))

c.subscribe(['orders'], on_assign=on_assign, on_revoke=on_revoke)

try:
    while running:
        msgs = c.consume(num_messages=500, timeout=1.0)

        if not msgs: continue

        for msg in msgs:
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue  # end of partition, not an error
                raise KafkaException(msg.error())

            process(msg)  # ← your business logic

        # Commit AFTER processing the batch — at-least-once
        c.commit(asynchronous=False)  # sync commit = no gaps

finally:
    c.close()  # triggers graceful rebalance, commits offsets

Consumer Groups & Rebalancing

REBALANCE STORM — The #1 Production Problem A rebalance pauses ALL consumers in a group while partitions are redistributed. Causes: slow processing (exceeds max.poll.interval.ms), GC pauses, DB queries in the hot path, or too many consumers for too few partitions. Symptoms: lag spikes, duplicate processing, offset commit failures.
Rebalance callbacks + static membershippython
from confluent_kafka import Consumer

# Static group membership — eliminates rebalance on restart
c = Consumer({
    'group.id':               'order-processor',
    'group.instance.id':      'processor-us-east-1a',  # STATIC
    'session.timeout.ms':     300000,  # 5min — safe rolling restarts
    # Static member holds its partitions for up to session.timeout
    # after disconnect. Rolling deploys trigger zero rebalances.
})

def on_assign(consumer, partitions):
    # Called when partitions are assigned — good place to:
    # - Seek to a custom offset
    # - Reset a per-partition state machine
    # - Load per-partition warm cache from DB
    for p in partitions:
        log.info(f"Assigned partition {p.partition}")
    consumer.assign(partitions)

def on_revoke(consumer, partitions):
    # Called BEFORE partitions are taken away.
    # MUST commit all pending offsets here — your last chance.
    consumer.commit(asynchronous=False)
    for p in partitions:
        flush_partition_state(p.partition)

─────────────────────────────────────────────────
# Partition assignment strategies (broker-side)
# partition.assignment.strategy=cooperative-sticky (Kafka 2.4+)
# cooperative-sticky: incremental rebalance — only moves
# partitions that need to move. Other partitions keep consuming.
# vs range/roundrobin: STOP-THE-WORLD — all partitions revoked.

# Always set cooperative-sticky in new deployments:
'partition.assignment.strategy': 'cooperative-sticky'

Consumer Lag — Reading the Signal

Lag monitoring + seek patternspython
from confluent_kafka import Consumer, TopicPartition

c = Consumer({'group.id': 'monitor', 'bootstrap.servers': '...'})

# Get consumer group lag for a topic:
md = c.list_topics('orders')
partitions = [
    TopicPartition('orders', p) for p in md.topics['orders'].partitions
]
committed  = c.committed(partitions)
high_water = c.get_watermark_offsets

for tp in committed:
    _, hw = c.get_watermark_offsets(tp)
    lag = hw - (tp.offset if tp.offset >= 0 else 0)
    print(f"Partition {tp.partition}: lag={lag}")

# Seek to specific offset (replay from point in time):
c.seek(TopicPartition('orders', partition=0, offset=12345))

# Seek to timestamp (replay from a specific time):
ts_ms = 1704067200000   # 2024-01-01 00:00:00 UTC in ms
offsets_for_ts = c.offsets_for_times([TopicPartition('orders', 0, ts_ms)])
c.seek(offsets_for_ts[0])
Stream Processing
KAFKA STREAMS
Kafka Streams is a library (not a cluster) that runs inside your Java/JVM application. No separate processing cluster. It reads from Kafka, transforms, and writes back to Kafka — with stateful windowed aggregations backed by RocksDB.

Topology Building Blocks

Kafka Streams DSL — Javajava
StreamsBuilder builder = new StreamsBuilder();

// KStream: unbounded stream of events
KStream<String, Order> orders =
    builder.stream("orders", Consumed.with(
        Serdes.String(), orderSerde
    ));

// Filter + branch
KStream<String, Order>[] branches = orders
    .filter((k, v) -> v.getAmount() > 0)
    .branch(
        (k, v) -> v.getStatus().equals("COMPLETED"),
        (k, v) -> v.getStatus().equals("REFUNDED"),
        (k, v) -> true  // catch-all
    );
KStream completed = branches[0];
KStream refunded  = branches[1];

// Stateful: aggregate revenue per customer per hour
KTable<Windowed<String>, Double> hourlyRevenue = completed
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(
        Duration.ofHours(1)
    ))
    .aggregate(
        () -> 0.0,
        (key, order, agg) -> agg + order.getAmount(),
        Materialized.as("hourly-revenue-store")
    );

// Write result back to Kafka
hourlyRevenue
    .toStream()
    .to("hourly-revenue");

// Start topology
KafkaStreams app = new KafkaStreams(
    builder.build(), streamsConfig
);
app.start();

KTable vs KStream vs GlobalKTable

AbstractionRepresentsUse When
KStreamInfinite stream of events (each = new fact)Clickstream, logs, transactions
KTableChangelog stream (latest value per key)User profiles, account balances
GlobalKTableKTable replicated to ALL instancesSmall lookup tables (countries, SKUs)
Stream-Table join + windowingjava
// KStream-KTable join: enrich each order with customer profile
KTable<String, Customer> customers =
    builder.table("customers");   // latest value per customer_id

KStream<String, EnrichedOrder> enriched =
    orders.join(
        customers,
        (order, customer) -> new EnrichedOrder(order, customer)
    );
// Note: both must be co-partitioned (same # of partitions, same key)

─────────────────────────────────────────────────
// WINDOWING TYPES:
// Tumbling:  fixed-size, non-overlapping
// Hopping:   fixed-size, overlapping (window can advance by hop)
// Session:   activity-gap based (closes after N ms of inactivity)
// Sliding:   every event creates a window ending at that event

// Session window — user activity sessions
orders.groupByKey()
    .windowedBy(SessionWindows
        .ofInactivityGapWithNoGrace(Duration.ofMinutes(30))
    )
    .count();  // events per session per user

// Interactive query — read state store from ANY instance:
ReadOnlyKeyValueStore<String, Long> store =
    streams.store(StoreQueryParameters.fromNameAndType(
        "hourly-revenue-store",
        QueryableStoreTypes.keyValueStore()
    ));
Long revenue = store.get("customer-123");
Grace Period — Handling Late-Arriving Events By default, windowed aggregations reject records that arrive after the window closes. Use .ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(5)) to accept late arrivals up to 5 minutes late. Critical for Kafka→S3→Kafka pipelines where end-to-end latency varies.
Data Integration
KAFKA CONNECT
Kafka Connect is a distributed, fault-tolerant integration framework. Source connectors pull data in. Sink connectors push it out. Single Message Transforms (SMTs) reshape records inline — no code required for 80% of use cases.

Debezium — CDC from Any Database

CDC — The Most Important Source Connector Debezium reads the database binary replication log (WAL in Postgres, binlog in MySQL). Every INSERT, UPDATE, and DELETE becomes a Kafka event with before/after state. Zero impact on the source database, sub-second latency, and full history — no polling, no triggers, no app changes.
Debezium PostgreSQL connector configjson
{
  "name": "postgres-orders-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name":       "pgoutput",         // no plugin needed in PG14+
    "database.hostname": "prod-db.internal",
    "database.port":     "5432",
    "database.user":     "debezium_ro",
    "database.password": "${file:/secrets/db.properties:password}",
    "database.dbname":   "ecommerce",
    "database.server.name": "ecommerce-prod",  // topic prefix

    // Topic: ecommerce-prod.public.orders
    "table.include.list": "public.orders,public.customers,public.inventory",
    "column.exclude.list": "public.customers.ssn,public.customers.ccnum",

    // Snapshot mode: initial | never | when_needed
    "snapshot.mode":      "initial",     // full table scan on first start
    "snapshot.isolation.mode": "repeatable_read",

    // Outbox pattern — for transactional outbox
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.fields.additional.placement": "type:header:eventType"
  }
}

S3 Sink — The Lakehouse Landing Zone

S3 Sink connector — Parquet outputjson
{
  "name": "s3-sink-orders",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max":        "8",               // parallelism
    "topics":           "orders,inventory",

    "s3.region":        "us-east-1",
    "s3.bucket.name":   "my-data-lake",
    "s3.part.size":     "67108864",       // 64MB part uploads

    // Hive-compatible partitioning: year=2024/month=01/day=15/
    "storage.class":    "io.confluent.connect.s3.storage.S3Storage",
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format":      "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale":           "en-US",
    "timezone":         "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field":  "event_ts",

    // Format: Parquet with schema evolution
    "format.class":     "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec":    "zstd",
    "schema.compatibility": "FULL_TRANSITIVE",

    // Flush every 10min OR 50k records OR 100MB
    "rotate.interval.ms":    "600000",
    "flush.size":            "50000"
  }
}

Single Message Transforms (SMTs)

SMT chains — no code requiredjson
"transforms": "route,flatten,mask,addfield,timestamp",

// 1. Route by field value → different topics
"transforms.route.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.route.fields": "region",

// 2. Flatten nested struct → flat key=value
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_",

// 3. Mask PII fields with a fixed value
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "ssn,credit_card,email",
"transforms.mask.replacement": "[REDACTED]",

// 4. Add a static metadata field
"transforms.addfield.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addfield.static.field": "_source",
"transforms.addfield.static.value": "postgres-prod",

// 5. Convert timestamp format
"transforms.timestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp.field": "created_at",
"transforms.timestamp.target.type": "unix"
Schema Management
SCHEMA REGISTRY
Without schema management, Kafka topics become a shared mutable blob format — producers break consumers silently. The Schema Registry enforces data contracts, enables safe schema evolution, and shrinks message size by storing schemas out-of-band.

Avro — The Production Choice

Avro schema + Python producerpython
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

# Schema defined once in the registry
ORDER_SCHEMA_STR = """
{
  "type": "record",
  "name": "Order",
  "namespace": "com.myco.ecommerce",
  "fields": [
    {"name": "order_id",     "type": "string"},
    {"name": "customer_id",  "type": "string"},
    {"name": "amount_cents", "type": "long"},
    {"name": "status",       "type": {"type": "enum",
      "name": "OrderStatus",
      "symbols": ["PENDING","PROCESSING","COMPLETED","CANCELLED"]}},
    {"name": "created_at",   "type": {"type": "long",
      "logicalType": "timestamp-millis"}},
    {"name": "metadata",     "type": ["null", {
      "type": "record", "name": "Meta",
      "fields": [
        {"name": "ip",  "type": "string"},
        {"name": "ua",  "type": ["null","string"], "default": null}
      ]
    }], "default": null}
  ]
}
"""

sr_client = SchemaRegistryClient({'url': 'https://sr.internal:8081'})
avro_serializer = AvroSerializer(sr_client, ORDER_SCHEMA_STR,
    conf={'auto.register.schemas': True}
)

# Wire format: [0x00][4-byte schema ID][avro bytes]
# Schema ID is looked up in registry by consumer.
# Payload is 5–50× smaller than JSON for the same data.

p = Producer({'bootstrap.servers': 'broker:9092'})
p.produce(
    topic='orders',
    key=order['customer_id'].encode(),
    value=avro_serializer(order, SerializationContext('orders', MessageField.VALUE))
)

Compatibility Modes — Schema Evolution Rules

BACKWARD
New schema can read old data. Add fields with defaults. Remove fields without defaults. Default production choice.
SAFE
FORWARD
Old schema can read new data. Add fields without defaults. Remove fields with defaults. Safe for rolling consumer deploys.
OK
FULL
Both backward AND forward. Add/remove fields only with defaults. Most restrictive, safest for long-lived topics.
STRICT
NONE
No compatibility checks. Any change allowed. Do not use in production. Only acceptable for dev/test topics.
DANGER
*_TRANSITIVE
Like above but checks against ALL previous versions, not just the last. Use when consumers might be multiple versions behind.
SAFEST
What's allowed per mode (Avro)avro
// ALLOWED with BACKWARD compatibility:
// ✅ Add field with default:  {"name":"region","type":"string","default":"US"}
// ✅ Remove field that had a default in prior schema
// ❌ Remove field with no default (old consumers break)
// ❌ Change field type (int → string)
// ❌ Rename field (it's a delete + add)

// SAFE RENAME PATTERN: use aliases
{
  "name": "customer_identifier",   // new name
  "aliases": ["customer_id"],      // old name — Avro maps it
  "type": "string"
}

// Protobuf advantages over Avro for evolution:
// - Field numbers never change (even if names do)
// - Adding any field is always backward compatible
// - Required → optional is always safe
// - Better for gRPC-native stacks
Delivery Semantics
EXACTLY-ONCE SEMANTICS
Exactly-once is the hardest problem in distributed systems. Kafka solves it end-to-end via three interlocking mechanisms: idempotent producers, transactions, and transactional consumers. Most teams don't need it — but when you do, here's exactly how it works.

The Three Layers of EOS

Layer 1: Idempotent Producer
PID + sequence number → broker deduplicates retries per partition
↓ enable.idempotence=true
Layer 2: Producer Transactions
transactional.id → atomic multi-partition, multi-topic writes
↓ isolation.level=read_committed
Layer 3: Transactional Consumer
reads only committed offsets; abort markers invisible
Idempotent producer mechanicspython
"""
Idempotent Producer Internals:
  1. Broker assigns Producer ID (PID) on first connect
  2. Producer tags each message: (PID, partition, sequence_num)
  3. Broker tracks latest sequence per (PID, partition)
  4. Duplicate: seq ≤ last seen → broker ACKs but does NOT write
  5. Out-of-order: seq > expected → broker rejects (ProducerFencedException)

This prevents duplicate writes on retry — the #1 cause of
"at-least-once" overproduction on network errors.
"""

p = Producer({
    'enable.idempotence':      True,
    'acks':                    'all',     # required for idempotence
    'retries':                 2147483647, # max — idempotent = safe to retry
    'max.in.flight.requests.per.connection': 5,  # up to 5 with idempotence
})
# Idempotence scope: per partition.
# Does NOT prevent duplicates across partitions without transactions.

EOS Read-Process-Write Pattern

Full EOS pipelinepython
"""
EOS Read-Process-Write:
  Consumer reads from topic A
  → processes record
  → writes result to topic B
  → commits input offset
  All four in one atomic transaction.
"""

consumer = Consumer({
    'group.id':              'eos-processor',
    'enable.auto.commit':    False,
    'isolation.level':       'read_committed',  # CRITICAL
    # read_committed: consumer skips aborted transactions
    # and messages beyond the last stable offset (LSO).
    # WITHOUT this: consumer reads partial/aborted transactions.
})

producer = Producer({
    'transactional.id':      'eos-proc-instance-1',
    'enable.idempotence':    True,
})
producer.init_transactions()
consumer.subscribe(['raw-orders'])

while True:
    msgs = consumer.consume(100, timeout=1.0)
    if not msgs: continue

    producer.begin_transaction()
    try:
        for msg in msgs:
            result = transform(msg)
            producer.produce('processed-orders', value=result)

        # Atomically commit offsets WITH the output write
        producer.send_offsets_to_transaction(
            consumer.position(consumer.assignment()),
            consumer.consumer_group_metadata()
        )
        producer.commit_transaction()

    except Exception as e:
        producer.abort_transaction()
        # Re-seek to last committed position
        consumer.seek_to_committed()

"""
Why this is exactly-once:
  - Output write + offset commit = one atomic transaction
  - If process crashes mid-transaction: abort, re-seek, reprocess
  - If broker fails after commit: idempotent producer handles retry
  - Consumer with read_committed: never sees the aborted writes
"""
Do You Actually Need EOS? EOS adds ~20-30% throughput overhead. For most pipelines: idempotent consumer logic (upserts, deduplication by event_id) + at-least-once delivery = "good enough" EOS at a fraction of the cost. True EOS is essential for: financial transactions, inventory deductions, anything where double-processing causes irreversible harm.
Operations & Tuning
OPS — PRODUCTION SIGNAL
Kafka operations is where junior engineers become senior engineers. Partitioning decisions, retention math, broker JVM tuning, and the three metrics that tell you everything about cluster health.

Partition Sizing — The Critical Decision

You Cannot Decrease Partition Count Kafka only allows increasing partitions, never decreasing. Adding partitions reshuffles key-to-partition mapping — existing consumers may get a different set of data. Design partition count at topic creation with 2× headroom for future scale.
Partition count formulapython
"""
Partition count rules of thumb:

1. Throughput-based:
   partitions = max(producer_throughput, consumer_throughput) / per_partition_throughput
   Per partition: ~10MB/s write, ~50MB/s read (modern NVMe broker)

2. Consumer parallelism:
   Max parallelism = partition count
   10 partitions → max 10 active consumers in a group

3. Leadership load:
   Each partition = 1 leader election on broker failure
   Each partition leader = ~1-5ms coordination overhead
   Recommendation: < 4000 partitions per broker (KRaft: much higher)

4. Replication factor (RF):
   RF=1 → no fault tolerance (dev only)
   RF=2 → can survive 1 broker failure (min for production)
   RF=3 → standard production — survives 2 simultaneous failures

EXAMPLE: 100MB/s peak throughput, 4 consumers, RF=3
   By throughput: 100/10 = 10 partitions minimum
   By parallelism: 4 consumers → 4 partitions minimum
   Add 2x headroom: 20 partitions
   Final: 24 (round to multiple of consumer count = 4)
"""

# Create topic with admin client
from confluent_kafka.admin import AdminClient, NewTopic

admin = AdminClient({'bootstrap.servers': 'broker:9092'})
admin.create_topics([
    NewTopic(
        topic              = 'orders',
        num_partitions     = 24,
        replication_factor = 3,
        config             = {
            'retention.ms':       '604800000',  # 7 days
            'retention.bytes':    '53687091200', # 50GB per partition
            'compression.type':   'zstd',
            'min.insync.replicas':'2',
            'cleanup.policy':     'delete',      # or 'compact' for KTable
            'message.timestamp.type': 'CreateTime',
        }
    )
])

Broker Tuning — The JVM & OS Config

server.properties — production brokerproperties
## NETWORK & THREADS
num.network.threads=8              # socket threads: 2x CPU cores
num.io.threads=16                  # disk I/O threads: 4x CPU cores
num.replica.fetchers=4             # replication fetch parallelism
socket.send.buffer.bytes=1048576   # 1MB socket buffer
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600 # 100MB max request

## LOG STORAGE
log.dirs=/data/kafka-logs,/data2/kafka-logs  # RAID-0 across SSDs
log.segment.bytes=1073741824       # 1GB segments
log.retention.hours=168            # 7 days
log.flush.interval.messages=10000  # fsync every 10K messages
log.flush.interval.ms=1000        # or every 1s — whichever first

## REPLICATION
default.replication.factor=3
min.insync.replicas=2
replica.lag.time.max.ms=30000     # remove from ISR after 30s lag

## JVM (kafka-server-start.sh or KAFKA_HEAP_OPTS):
## KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
## -XX:+UseG1GC -XX:MaxGCPauseMillis=20
## -XX:InitiatingHeapOccupancyPercent=35
## -XX:+ExplicitGCInvokesConcurrent

## OS: sudo sysctl -w vm.swappiness=1
##     sudo sysctl -w net.core.rmem_max=134217728
##     sudo sysctl -w vm.max_map_count=262144
##     Set /proc/sys/fs/file-max to 1000000

The Three Metrics That Tell Everything

Consumer Lag
→ falling behind
Under-replicated
target: 0
Active Controller
must be: 1
Disk bytes/s
450 MB/s
ISR shrinks/s
target: 0
Consumer lag alert — Pythonpython
# Alert when any partition lag > threshold
from confluent_kafka import Consumer, TopicPartition

def check_consumer_lag(group_id, topic, threshold=10000):
    c = Consumer({'bootstrap.servers':'broker:9092','group.id':'lag-monitor'})
    tps = [TopicPartition(topic, p) for p in
           c.list_topics(topic).topics[topic].partitions]
    committed = c.committed(tps, timeout=5)
    alerts = []
    for tp in committed:
        _, hw = c.get_watermark_offsets(tp, timeout=5)
        lag = hw - max(tp.offset, 0)
        if lag > threshold:
            alerts.append({'partition': tp.partition, 'lag': lag})
    c.close()
    return alerts