v1.19 / v2.0
Runtime Architecture
THE STREAMING ENGINE
Flink is a distributed stateful computation engine. Where Kafka moves data between systems, Flink computes on that data in motion — with stateful operators, event-time semantics, and fault-tolerant exactly-once processing baked all the way down to the execution model.
JM+TM
Job + Task Mgr
Slots
Unit of parallelism
Async
Checkpointing
RocksDB
State Backend
ms
End-to-end Latency
Unified
Batch + Stream API

Cluster Topology

Runtime components
CLIENT — submits JobGraph via REST / CLI / SDK
↓ JobGraph
JOB MANAGER — Dispatcher + ResourceManager + JobMaster
▸ Schedules tasks ▸ Coordinates checkpoints ▸ Recovery on failure
↓ Deploy subtasks
TASKMANAGER 1 Slot 1 | Slot 2 | Slot 3 JVM process → threads
TASKMANAGER 2 Slot 1 | Slot 2 | Slot 3 Shares JVM heap + off-heap
TASKMANAGER N Slot 1 | Slot 2 | Slot 3 RocksDB for keyed state
↓ Persist snapshots
STATE BACKEND — HDFS / S3 / GCS (checkpoint storage)
Slot — The Unit of Parallelism Each TaskManager is a JVM process with N slots. A slot = a fixed slice of CPU and memory. An operator with parallelism 4 occupies 4 slots across TaskManagers. Slot sharing (default) lets different operators in the same job share a slot — enabling a full pipeline to run in one slot per TaskManager without extra scheduling overhead.

Execution Model: Dataflow Graph

A Flink program compiles to a Logical Dataflow Graph (operators + streams), then to a Physical Execution Graph (parallel subtasks). Network buffers connect subtasks — data flows through them in a push model, not a pull-loop like Kafka consumers.

Logical → Physical executionconcept
/* LOGICAL GRAPH (your code):
   Source ──→ Map ──→ KeyBy ──→ Window ──→ Sink

   PHYSICAL GRAPH (parallelism=3, what actually runs):
   Source[0] ──→ Map[0] ──→\
   Source[1] ──→ Map[1] ──→ KeyBy/shuffle ──→ Window[0] ──→ Sink[0]
   Source[2] ──→ Map[2] ──→/               ──→ Window[1] ──→ Sink[1]
                                            ──→ Window[2] ──→ Sink[2]

   NETWORK SHUFFLE: KeyBy routes each record to exactly one
   downstream subtask based on hash(key) % parallelism.
   This is the only place data crosses thread/network boundaries.

   FORWARD CHANNELS: Source→Map (same parallelism) use direct
   buffer handoff — zero serialization, zero network copy.
*/

// Configure execution mode:
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(4);          // global default
env.disableOperatorChaining();   // force separate tasks (debugging)
env.setBufferTimeout(100);      // flush network buffers every 100ms
                                 // lower = latency, higher = throughput

// Per-operator parallelism override:
stream
  .map(fn).setParallelism(8)   // this operator uses 8 subtasks
  .keyBy(key)                   // shuffle happens here
  .window(...)
  .sum("amount").setParallelism(4);

Deployment Modes

ModeJobManagerUse Case
ApplicationRuns inside cluster, 1 JM per jobProduction — strong isolation
Per-JobCreated on submit, destroyed on exitYARN/K8s short-lived jobs
SessionLong-lived, shared by many jobsDev, low-latency deploys
LocalEmbedded in main() processTesting, IDE debugging
Core Processing API
DATASTREAM API
The DataStream API is Flink's low-level streaming primitive. It gives you full control over state, timers, watermarks, and side outputs. Every higher-level API (Table, SQL, PyFlink) compiles down to DataStream operators.
Full streaming job — Java DSLjava
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

// ── SOURCE ──────────────────────────────────────
KafkaSource<Order> source = KafkaSource.<Order>builder()
    .setBootstrapServers("broker:9092")
    .setTopics("orders")
    .setGroupId("flink-order-processor")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new OrderDeserializer())
    .build();

DataStream<Order> orders = env.fromSource(
    source,
    WatermarkStrategy
        .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((e, ts) -> e.getEventTs()),
    "Kafka Orders Source"
);

// ── TRANSFORMATIONS ─────────────────────────────
DataStream<RevenueMetric> revenue = orders
    .filter(o -> o.getStatus() == Status.COMPLETED)   // parallel filter
    .map(o -> new EnrichedOrder(o, enrich(o)))            // stateless map
    .keyBy(EnrichedOrder::getCustomerId)                   // hash shuffle
    .window(TumblingEventTimeWindows.of(Time.hours(1)))      // 1-hour window
    .aggregate(
        new RevenueAggregator(),   // incremental: O(1) state per window
        new WindowResultMapper()    // called once when window fires
    );

// ── SINK ─────────────────────────────────────────
KafkaSink<RevenueMetric> sink = KafkaSink.<RevenueMetric>builder()
    .setBootstrapServers("broker:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("hourly-revenue")
        .setValueSerializationSchema(new RevenueSerializer())
        .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 2PC
    .setTransactionalIdPrefix("revenue-sink")
    .build();

revenue.sinkTo(sink);
env.execute("Hourly Revenue Aggregator");
KafkaSource
assigns event timestamps + watermarks
filter
stateless, executes in same task chain
map (enrich)
operator chaining — zero serialization
↓ hash shuffle (network)
keyBy(customerId)
same key always reaches same subtask
TumblingEventTime(1h)
window buffers until watermark passes window end
↓ fires on watermark
aggregate (incremental)
AggregateFunction accumulates state per-window
KafkaSink (EOS)
2-phase commit with checkpoint barrier
Operator Chaining — The Performance Secret Flink fuses consecutive stateless operators (filter→map→flatMap) into a single task thread. No serialization, no network, no buffer — just method calls in a tight loop. This is often the single largest source of latency reduction vs naive Kafka Streams topologies.
PyFlink DataStream APIpython
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import (
    KafkaSource, KafkaOffsetsInitializer, KafkaRecordSerializationSchema, KafkaSink
)
from pyflink.common import WatermarkStrategy, Duration, Types
from pyflink.common.serialization import SimpleStringSchema

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(60_000)  # checkpoint every 60s

# Add Kafka connector JARs (needed for PyFlink)
env.add_jars("file:///opt/flink/lib/flink-connector-kafka.jar")

# ── SOURCE ──────────────────────────────────────
source = (
    KafkaSource.builder()
    .set_bootstrap_servers("broker:9092")
    .set_topics("orders")
    .set_group_id("flink-py-processor")
    .set_starting_offsets(KafkaOffsetsInitializer.earliest())
    .set_value_only_deserializer(SimpleStringSchema())
    .build()
)

orders = env.from_source(
    source,
    WatermarkStrategy
        .for_bounded_out_of_orderness(Duration.of_seconds(5))
        .with_idleness(Duration.of_minutes(1)),
    "Kafka Source"
)

# ── TRANSFORMATIONS ─────────────────────────────
import json

def parse_order(raw_json: str) -> dict:
    d = json.loads(raw_json)
    return (d['customer_id'], d['amount'])

def is_valid(record) -> bool:
    return record[1] > 0

parsed = (
    orders
    .map(parse_order, output_type=Types.TUPLE([Types.STRING(), Types.FLOAT()]))
    .filter(is_valid)
    .key_by(lambda x: x[0])  # key by customer_id
)

# ── STATEFUL REDUCE (sum per customer, unbounded) ──
result = parsed.reduce(lambda a, b: (a[0], a[1] + b[1]))

result.print()
env.execute("PyFlink Order Processor")
PyFlink Architecture — JVM Bridge PyFlink is not pure Python. It's a Python wrapper around the Java/Scala Flink runtime. Python operators use Apache Beam's Python worker model — your Python functions run in a separate process connected to the JVM via gRPC. This means: Python UDFs add ~2-5ms per-record overhead vs Java. For heavy Python workloads, prefer Table API / Flink SQL which push computation to the JVM.
PyFlink — Table API (recommended for Python)python
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col, lit

settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(settings)

# Define source with DDL (connector resolved at runtime)
t_env.execute_sql("""
  CREATE TABLE orders (
    order_id     STRING,
    customer_id  STRING,
    amount       DOUBLE,
    event_ts     TIMESTAMP(3),
    WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND
  ) WITH (
    'connector' = 'kafka',
    'topic'     = 'orders',
    'properties.bootstrap.servers' = 'broker:9092',
    'format'    = 'json'
  )
""")

# Table API: compiled to Java operators, zero Python overhead
orders_table = t_env.from_path("orders")
result = (
    orders_table
    .filter(col("amount") > 0)
    .group_by(col("customer_id"))
    .select(
        col("customer_id"),
        col("amount").sum.alias("total_spend")
    )
)
result.execute().print()
ProcessFunction — full low-level controljava
/**
 * ProcessFunction: the most powerful Flink primitive.
 * Gives you:
 *   - Access to keyed state (per-key values, lists, maps)
 *   - Event-time and processing-time timers
 *   - Side outputs (route to different streams)
 *   - Full context: timestamp, key, current watermark
 */
public class SessionDetector
        extends KeyedProcessFunction<String, Event, Session> {

    // Declared state — Flink manages lifecycle across checkpoints
    private ValueState<Long>       sessionStart;
    private ValueState<Long>       lastEventTs;
    private ValueState<Integer>    eventCount;
    private ValueState<Long>       timerTs;     // pending timer

    private static final long GAP = 30 * 60_000; // 30min inactivity

    @Override
    public void open(OpenContext ctx) {
        sessionStart = getRuntimeContext().getState(
            new ValueStateDescriptor<>("session-start", Long.class));
        lastEventTs  = getRuntimeContext().getState(
            new ValueStateDescriptor<>("last-event",   Long.class));
        eventCount   = getRuntimeContext().getState(
            new ValueStateDescriptor<>("event-count",  Integer.class));
        timerTs      = getRuntimeContext().getState(
            new ValueStateDescriptor<>("timer-ts",     Long.class));
    }

    @Override
    public void processElement(Event e, Context ctx, Collector<Session> out) {
        Long start = sessionStart.value();

        if (start == null) {
            sessionStart.update(e.ts);   // start of new session
            eventCount.update(0);
        }
        eventCount.update(eventCount.value() + 1);
        lastEventTs.update(e.ts);

        // Cancel old timer, register new one (30min from NOW)
        Long oldTimer = timerTs.value();
        if (oldTimer != null)
            ctx.timerService().deleteEventTimeTimer(oldTimer);

        long newTimer = e.ts + GAP;
        ctx.timerService().registerEventTimeTimer(newTimer);
        timerTs.update(newTimer);
    }

    @Override
    public void onTimer(long ts, OnTimerContext ctx, Collector<Session> out) {
        // Timer fires → 30min inactivity → emit session
        out.collect(new Session(
            ctx.getCurrentKey(),
            sessionStart.value(), lastEventTs.value(),
            eventCount.value()
        ));
        // Clear state — this key starts fresh
        sessionStart.clear(); lastEventTs.clear();
        eventCount.clear();   timerTs.clear();
    }
}
Event-Time Timers — The Superpower Timers fire based on the watermark, not the system clock. This means: if you replay 6 months of historical data, timers fire at the correct business times — not 6 months later. Your session detection logic works identically on live data and historical replay. Kafka Streams does not have this capability.

RichFunction — Async External Lookups

AsyncFunction — non-blocking enrichmentjava
// Never use blocking DB calls in processElement()!
// They block the entire slot thread.
// Use AsyncFunction for external enrichment.
public class CustomerEnricher
        extends RichAsyncFunction<Order, EnrichedOrder> {

    private transient AsyncHttpClient client;

    @Override
    public void open(Configuration params) {
        client = Dsl.asyncHttpClient();   // non-blocking HTTP
    }

    @Override
    public void asyncInvoke(Order order, ResultFuture<EnrichedOrder> rf) {
        client.prepareGet("/customers/" + order.customerId)
            .execute(new AsyncCompletionHandler<>() {
                public Response onCompleted(Response resp) {
                    rf.complete(Collections.singletonList(
                        new EnrichedOrder(order, parseCustomer(resp))
                    ));
                    return resp;
                }
            });
    }
}

// Wire it into the pipeline:
DataStream<EnrichedOrder> enriched =
    AsyncDataStream.unorderedWait(
        orders,
        new CustomerEnricher(),
        1000, TimeUnit.MILLISECONDS, // timeout
        100                            // max concurrent requests
    );
Side outputs — multi-stream routingjava
/**
 * Side outputs split a stream into multiple tagged sub-streams
 * based on record content — without forking or filtering.
 * Zero overhead: routing decision made inline, no copy.
 */
OutputTag<Order> largeOrders   = new OutputTag<>("large-orders"){};
OutputTag<Order> lateEvents     = new OutputTag<>("late-events"){};
OutputTag<String> parseErrors   = new OutputTag<>("parse-errors"){};

SingleOutputStreamOperator<Order> mainStream = rawInput
    .process(new ProcessFunction<String, Order>() {
        public void processElement(String raw, Context ctx,
                Collector<Order> out) {
            try {
                Order o = parse(raw);

                if (o.getAmount() > 10_000)
                    ctx.output(largeOrders, o);   // → fraud review

                if (ctx.timestamp() < ctx.timerService().currentWatermark())
                    ctx.output(lateEvents, o);    // → late data audit

                out.collect(o);  // always goes to main stream too
            } catch (ParseException e) {
                ctx.output(parseErrors, raw);     // → DLQ topic
            }
        }
    });

// Each is a fully independent DataStream:
DataStream<Order>  fraudReview  = mainStream.getSideOutput(largeOrders);
DataStream<Order>  lateAudit    = mainStream.getSideOutput(lateEvents);
DataStream<String> deadLetters  = mainStream.getSideOutput(parseErrors);

// Route to different Kafka topics:
fraudReview.sinkTo(fraudTopic);
lateAudit.sinkTo(auditTopic);
deadLetters.sinkTo(dlqTopic);
Side Outputs Replace Dead-Letter Queues Instead of try/catch-then-produce-to-DLQ in your consumer (which breaks EOS), use Flink side outputs. Parse errors, late records, fraud candidates, and audit events all route to separate physical streams while maintaining exactly-once guarantees end-to-end. The DLQ is no longer an afterthought.

Connected Streams — Joining Two Types

connect() — heterogeneous stream mergejava
// connect() joins TWO DIFFERENT TYPES in one operator.
// Unlike union() (same type) or join() (keyed window),
// connect() lets each stream handle its own logic with shared state.

DataStream<Order>  orders  = ...;
DataStream<Config> configs = ...;  // dynamic config changes

BroadcastStream<Config> bcConfig =
    configs.broadcast(configStateDescriptor);

orders.connect(bcConfig)
    .process(new BroadcastProcessFunction<Order, Config, Result>() {

        public void processElement(Order order, ReadOnlyContext ctx, ...) {
            Config cfg = ctx.getBroadcastState(configStateDescriptor)
                            .get("current");
            // Apply latest config to every order — no restart needed
            out.collect(applyConfig(order, cfg));
        }

        public void processBroadcastElement(Config cfg, ...) {
            ctx.getBroadcastState(configStateDescriptor).put("current", cfg);
        }
    });
Time & Ordering
WATERMARKS — EVENT TIME
Watermarks are Flink's most powerful and most misunderstood concept. They are the mechanism by which a distributed streaming engine reasons about time when data can arrive late, out-of-order, and from multiple parallel partitions simultaneously.

The Three Time Domains

DomainClock SourceReplays Correctly?Use For
Event TimeTimestamp in the record itselfYes ✓All business-critical windows
Processing TimeSystem wall clock on TaskManagerNo ✗Approximate monitoring only
Ingestion TimeTimestamp assigned at sourcePartialBackpressure detection
The Core Problem Watermarks Solve In a distributed system, event E with timestamp T=10:00 might arrive AFTER event E2 with timestamp T=10:05 due to network delays, retries, or partition leadership changes. If Flink fires a 10:00–10:01 window the moment it sees T=10:00, it will miss E arriving late. Watermarks say: "I guarantee no event with timestamp < W will ever arrive again." Windows fire when their watermark passes — not when the clock passes.

Watermark Strategies

WatermarkStrategy — all patternsjava
// 1. Bounded out-of-orderness (most common production choice)
WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((e, ts) -> e.getEventTs())
// Watermark = max(seen_event_ts) - 5s
// "I'll wait 5 seconds for stragglers, then fire windows"

// 2. Monotonically increasing (data is perfectly ordered)
WatermarkStrategy
    .<Event>forMonotonousTimestamps()
    .withTimestampAssigner((e, ts) -> e.getEventTs())
// Watermark = max(seen_ts) - 1ms. Zero delay. Use only if certain.

// 3. Custom — per-partition watermarks (Kafka-native)
WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(30))
    .withTimestampAssigner(...)
    .withIdleness(Duration.ofMinutes(5))
// withIdleness: if a Kafka partition has no events for 5min,
// don't let it block the global watermark from advancing.
// Critical for low-volume topics with sparse partitions.

// 4. Custom WatermarkGenerator (full control)
public class OrderedEventTimeGenerator
        implements WatermarkGenerator<Event> {
    private long maxTs = Long.MIN_VALUE;

    public void onEvent(Event e, long ts, WatermarkOutput out) {
        maxTs = Math.max(maxTs, e.getTs());
    }
    public void onPeriodicEmit(WatermarkOutput out) {
        out.emitWatermark(new Watermark(maxTs - 5000));
    }
}

Watermark Propagation in Parallel

Partition 0 events:
t=1
08:00:01
t=3
08:00:03
t=7
08:00:07
t=9
08:00:09
WM=4
→ emitted
Partition 1 events:
t=2
08:00:02
t=1
late!
t=5
08:00:05
WM=0
→ emitted
Global watermark (min of all partitions):
min
min(4,0)=0
blocked
by P1
← Partition 1's stragglers hold back the window!
Solution: .withIdleness(Duration.ofMinutes(1)) — marks idle partition, removes from global min calculation

Allowed Lateness + Late Data

Late events — three disposal strategiesjava
OutputTag<Order> lateTag = new OutputTag<>("late-orders"){};

SingleOutputStreamOperator<Revenue> result = orders
    .keyBy(o -> o.customerId)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))

    // Strategy 1: Allow late events to re-fire the window
    // Window fires at watermark, then AGAIN for each late event
    // for up to 'allowedLateness' past the window close
    .allowedLateness(Time.minutes(5))

    // Strategy 2: Route very-late events to a side output
    // (events arriving after allowedLateness period)
    .sideOutputLateData(lateTag)

    .aggregate(new RevenueAgg());

// Strategy 3: Process side output separately (correction stream)
DataStream<Order> lateOrders = result.getSideOutput(lateTag);
lateOrders.sinkTo(lateOrdersAuditTopic);

/*
 TIMELINE:
  t=10:00:00  window [09:00, 10:00) opens
  t=10:00:05  watermark passes 10:00 → window FIRES (main output)
  t=10:02:00  late event arrives (ts=09:58) → allowedLateness=5min
              still within window → fires AGAIN with updated result
  t=10:05:01  another late event (ts=09:45) → past allowedLateness
              routed to lateTag side output
*/
Fault Tolerance
STATE & CHECKPOINTS
Flink's state is first-class — it is not a cache, not a side-car, not a database call. It lives co-located with the operator, checkpointed asynchronously via Chandy-Lamport distributed snapshots, and restored automatically on failure. This is what makes Flink fault-tolerant without a coordinator like Zookeeper.

State Types

Keyed State primitivesjava
// ALL keyed state is scoped to: (operator, key, namespace)
// Flink manages serialization, checkpointing, and TTL.

// 1. ValueState — single value per key
ValueState<Long> count = ctx.getState(
    new ValueStateDescriptor<>("event-count", Long.class));
count.update(count.value() + 1);
Long v = count.value();
count.clear();

// 2. ListState — append-only list per key
ListState<Event> buffer = ctx.getListState(
    new ListStateDescriptor<>("event-buffer", Event.class));
buffer.add(event);
Iterable<Event> all = buffer.get();
buffer.update(newList);   // replace all

// 3. MapState — key-value map per Flink-key
MapState<String, Double> totals = ctx.getMapState(
    new MapStateDescriptor<>("category-totals",
        String.class, Double.class));
totals.put("electronics", totals.get("electronics") + amount);

// 4. ReducingState — auto-reduces on add()
ReducingState<Long> sum = ctx.getReducingState(
    new ReducingStateDescriptor<>("sum", Long::sum, Long.class));
sum.add(42L);   // calls (current, 42L) → Long::sum
sum.get();       // current aggregate

// 5. AggregatingState — complex incremental aggregation
AggregatingState<Order, Stats> statsState = ctx.getAggregatingState(
    new AggregatingStateDescriptor<>("stats",
        new StatsAggregator(), Stats.class));

// 6. State TTL — automatic expiry (prevents unbounded growth)
StateTtlConfig ttl = StateTtlConfig
    .newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupInRocksdbCompactFilter(1000)  // compact during RocksDB flush
    .build();
valueStateDescriptor.enableTimeToLive(ttl);

State Backends

BackendState LocationCheckpointBest For
HashMapStateBackendJVM Heap (objects)Async snapshot to FSSmall state, fast access
EmbeddedRocksDBOff-heap (SSD/NVMe)Incremental to S3/HDFSLarge state, TB-scale
RocksDB — The Production State Backend RocksDB stores state outside the JVM heap on local disk. This eliminates GC pressure (no Java GC pauses on 100GB of state), enables incremental checkpoints (only changed SSTable files are uploaded — not the whole state), and allows state larger than available RAM. The tradeoff: state access is ~10× slower than heap (disk read vs memory). Use for any job with >1GB state per key group.

Chandy-Lamport Checkpointing

Flink's checkpoint algorithm injects checkpoint barriers — special markers — into the data stream alongside regular events. When an operator receives barriers from all upstream partitions, it snapshots its state and propagates the barrier downstream. No data flow is paused.

CHECKPOINT SEQUENCE — operator chain
Source
BARRIER #42
injected into stream
↓ barrier reaches Map operator
Map
snapshot Map state async
BARRIER #42
forwarded
↓ all upstream barriers aligned at Window
Window
snapshot window buffers async
→ S3/HDFS
KEY INSIGHT: Data keeps flowing while state is being snapshotted. The copy-on-write snapshot is taken asynchronously. Checkpoint completion = all operators have confirmed their snapshot upload.
Checkpoint + savepoint configjava
CheckpointConfig cfg = env.getCheckpointConfig();
cfg.setCheckpointInterval(60_000);       // every 60s
cfg.setCheckpointTimeout(120_000);       // fail if > 2min
cfg.setMinPauseBetweenCheckpoints(30_000); // cooldown
cfg.setMaxConcurrentCheckpoints(1);       // 1 in-flight at a time
cfg.setExternalizedCheckpointCleanup(      // keep on cancel
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// Incremental checkpoints (RocksDB only — huge for large state):
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
// true = incremental. Only changed SSTable files uploaded.
// Full checkpoint: 100GB → 30min. Incremental: 100GB → 90s.

// Savepoint: operator-initiated, used for upgrades
// $ flink savepoint <jobId> s3://bucket/savepoints/
// $ flink run -s s3://bucket/savepoints/sp-123 job.jar
// Savepoints survive code changes (with compatible state schema)
Declarative Streaming
FLINK SQL — STREAMING ANSI SQL
Flink SQL is a fully featured streaming SQL engine. It handles windowed aggregations, temporal table joins, changelog streams (CDC), and upsert semantics — all in standard SQL syntax. Internally it compiles to the same DataStream operators you'd write by hand.
CREATE TABLE — source and sink DDLsql
-- Kafka source with Avro schema registry
CREATE TABLE orders (
  order_id      STRING,
  customer_id   STRING,
  amount        DECIMAL(18, 4),
  status        STRING,
  event_ts      TIMESTAMP(3),
  -- Watermark: allow 5s of out-of-order delivery
  WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND
) WITH (
  'connector'                     = 'kafka',
  'topic'                         = 'orders',
  'properties.bootstrap.servers'  = 'broker:9092',
  'properties.group.id'           = 'flink-sql-group',
  'scan.startup.mode'             = 'earliest-offset',
  'format'                        = 'avro-confluent',
  'avro-confluent.url'            = 'http://schema-registry:8081'
);

-- Parquet source (S3 data lake)
CREATE TABLE customer_profiles (
  customer_id   STRING,
  tier          STRING,
  country       STRING,
  updated_at    TIMESTAMP(3)
) WITH (
  'connector'   = 'filesystem',
  'path'        = 's3://lake/customer-profiles/',
  'format'      = 'parquet',
  'source.monitor-interval' = '60s'  -- poll for new files
);

-- JDBC lookup (for enrichment with temporal join)
CREATE TABLE exchange_rates (
  currency      STRING,
  rate          DOUBLE,
  valid_from    TIMESTAMP(3),
  PRIMARY KEY   (currency) NOT ENFORCED
) WITH (
  'connector'           = 'jdbc',
  'url'                 = 'jdbc:postgresql://db:5432/rates',
  'table-name'          = 'exchange_rates',
  'lookup.cache.max-rows' = '10000',   -- in-memory cache
  'lookup.cache.ttl'    = '60s'
);

-- Iceberg sink (write to data lake with ACID)
CREATE TABLE hourly_revenue_iceberg (
  window_start  TIMESTAMP(3),
  customer_id   STRING,
  total_revenue DECIMAL(18,4),
  PRIMARY KEY   (window_start, customer_id) NOT ENFORCED
) WITH (
  'connector'   = 'iceberg',
  'catalog-name'= 'glue',
  'warehouse'   = 's3://lake/iceberg/',
  'write.format.default' = 'parquet'
);
Dynamic Table — The Core Abstraction Flink SQL treats every streaming source as a Dynamic Table that grows continuously. A SELECT on a streaming source is a continuous query — it keeps running, updating its result as new rows arrive. An INSERT INTO a Kafka sink continuously appends results. An UPSERT INTO an Iceberg table continuously merges changes. The SQL is standard; the execution is infinite.
Continuous query patternssql
-- Simple continuous filter — runs forever
INSERT INTO high_value_orders
SELECT * FROM orders
WHERE  amount > 1000
  AND  status = 'COMPLETED';

-- Lookup join with JDBC (adds async I/O under the hood)
SELECT
  o.order_id,
  o.amount,
  c.tier,
  c.country
FROM orders o
JOIN customer_profiles FOR SYSTEM_TIME AS OF o.event_ts c
  ON  o.customer_id = c.customer_id;
-- FOR SYSTEM_TIME AS OF: temporal join.
-- Looks up the customer profile valid AT the event's timestamp.
-- This is "as-of" join semantics — identical to DuckDB's ASOF JOIN.

-- Group by with deduplication
SELECT customer_id, count(DISTINCT session_id) AS sessions
FROM   orders
GROUP BY customer_id;
Windowed aggregations in SQLsql
-- TUMBLE: fixed non-overlapping windows
SELECT
  window_start, window_end,
  customer_id,
  SUM(amount)           AS hourly_revenue,
  COUNT(*)              AS order_count,
  COUNT(DISTINCT session_id) AS sessions
FROM TABLE(
  TUMBLE(TABLE orders, DESCRIPTOR(event_ts), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end, customer_id;

-- HOP (sliding): 10-minute windows advancing every 5 minutes
SELECT
  window_start, window_end, customer_id, SUM(amount)
FROM TABLE(
  HOP(TABLE orders, DESCRIPTOR(event_ts),
      INTERVAL '5' MINUTE,   -- hop (slide) interval
      INTERVAL '10' MINUTE)  -- window size
)
GROUP BY window_start, window_end, customer_id;

-- SESSION: close window after 30min inactivity per key
SELECT
  window_start, window_end,
  customer_id,
  COUNT(*) AS events_in_session
FROM TABLE(
  SESSION(TABLE orders PARTITION BY customer_id,
          DESCRIPTOR(event_ts),
          INTERVAL '30' MINUTE)
)
GROUP BY window_start, window_end, customer_id;

-- CUMULATE: growing windows that reset at fixed boundaries
-- Perfect for: real-time "revenue so far today" metrics
SELECT
  window_start,  -- start of day (reset boundary)
  window_end,    -- current accumulation point
  customer_id,
  SUM(amount) AS revenue_so_far_today
FROM TABLE(
  CUMULATE(TABLE orders, DESCRIPTOR(event_ts),
            INTERVAL '1' HOUR,   -- step: emit every hour
            INTERVAL '1' DAY)    -- max size: reset daily
)
GROUP BY window_start, window_end, customer_id;
CUMULATE Window — Flink's Secret SQL Feature CUMULATE is unique to Flink SQL. It emits cumulative results at regular step intervals, resetting at a larger boundary. A common pattern: emit hourly revenue totals that accumulate through the day (00:00–01:00, 00:00–02:00, ... 00:00–24:00), then reset at midnight. Impossible to express cleanly in Kafka ksqlDB or standard SQL without application logic.

Window TVF — The New Model vs Deprecated GroupWindow

APIStatusSyntax
Window TVFCurrent (1.13+)TABLE(TUMBLE(TABLE t, ...))
GroupWindowDeprecatedGROUP BY TUMBLE(ts, INTERVAL...)

Window TVF enables Window TopN, Window Deduplication, and Window Join — capabilities impossible with the old GroupWindow API.

Temporal joins — the most powerful SQL featuresql
/*
 TEMPORAL TABLE JOIN (versioned): for each order, find the
 exchange rate that was valid AT the order's event time.
 This requires the rate table to be versioned (append-only CDC).
*/
CREATE TABLE exchange_rates_versioned (
  currency    STRING,
  rate        DOUBLE,
  valid_from  TIMESTAMP(3),
  PRIMARY KEY (currency) NOT ENFORCED,
  WATERMARK FOR valid_from AS valid_from - INTERVAL '1' SECOND
) WITH (...);  -- backed by Kafka topic (append-only)

-- Temporal join: look up rate valid AT order time
-- Flink maintains a versioned state of the rate table.
SELECT
  o.order_id,
  o.currency,
  o.amount,
  o.amount * r.rate AS amount_usd,
  r.valid_from      AS rate_valid_from
FROM orders o
JOIN exchange_rates_versioned FOR SYSTEM_TIME AS OF o.event_ts r
  ON o.currency = r.currency;

/*
 INTERVAL JOIN: join two streams where the timestamps
 must be within a time distance of each other.
 Use for: matching orders with their payments, correlating
 click events with subsequent purchases.
*/
SELECT
  o.order_id,
  p.payment_id,
  p.amount    AS paid_amount,
  o.amount    AS order_amount,
  p.event_ts - o.event_ts AS payment_delay
FROM  orders o, payments p
WHERE o.order_id = p.order_id
  AND  p.event_ts BETWEEN
         o.event_ts AND o.event_ts + INTERVAL '24' HOUR;
-- Flink buffers orders for 24h, waiting for matching payments.
-- Records older than 24h are cleared from state automatically.
Interval Join vs Stream-Table Join Interval join: both sides are unbounded streams, matched within a time interval. Flink buffers both streams and clears old state when the interval expires. Temporal table join: one side is a versioned changelog, accessed as "what was its value at this timestamp?" — no buffering on the changelog side, just versioned state. Use interval join for correlating two event streams. Use temporal join for enrichment with a slowly-changing reference dataset.
Deduplication + Top-N in SQLsql
-- DEDUPLICATION: keep only the first event per key
-- (solves at-least-once source duplicates)
SELECT order_id, customer_id, amount, event_ts
FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY order_id
      ORDER BY     event_ts ASC
    ) AS rn
  FROM orders
)
WHERE rn = 1;

-- WINDOW TOP-N: top 3 products by revenue per hour
SELECT * FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY window_start, window_end
      ORDER BY     total_revenue DESC
    ) AS rank
  FROM hourly_product_revenue   -- windowed result from TUMBLE
)
WHERE rank <= 3;
CDC streams + changelog modesql
/*
 Flink SQL understands CHANGELOG SEMANTICS.
 A CDC stream from Debezium contains +I (insert), -U/-D (retract),
 and +U (update) messages. Flink treats these as:
   - INSERT: new row enters the aggregation
   - UPDATE_BEFORE: retract old value from aggregation
   - UPDATE_AFTER: add new value to aggregation
   - DELETE: retract row from aggregation

 This means you can GROUP BY and aggregate a CDC stream and the
 aggregate UPDATES as rows change in the source database.
 Kafka ksqlDB cannot do this.
*/

-- CDC source (Debezium Kafka output → Flink CDC table)
CREATE TABLE orders_cdc (
  order_id     STRING,
  customer_id  STRING,
  amount       DECIMAL(18,4),
  status       STRING,
  PRIMARY KEY  (order_id) NOT ENFORCED
) WITH (
  'connector'    = 'kafka',
  'topic'        = 'ecommerce-prod.public.orders',  -- Debezium topic
  'format'       = 'debezium-json',               -- understands CDC envelope
  'properties.bootstrap.servers' = 'broker:9092'
);

-- Materialise a live sum — updates as orders change status:
SELECT
  status,
  COUNT(*)    AS order_count,
  SUM(amount) AS total_amount
FROM    orders_cdc
GROUP BY status;
-- When an order changes from PENDING → COMPLETED:
--   -U: subtract from PENDING count/sum
--   +U: add to COMPLETED count/sum
-- Result is a continuously updating table.

-- Write CDC aggregation to UPSERT Kafka topic:
CREATE TABLE order_status_summary_kafka (
  status       STRING,
  order_count  BIGINT,
  total_amount DECIMAL(18,4),
  PRIMARY KEY (status) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',   -- key-based compaction
  'topic'     = 'order-status-summary',
  'properties.bootstrap.servers' = 'broker:9092',
  'key.format'  = 'json',
  'value.format'= 'json'
);
INSERT INTO order_status_summary_kafka
SELECT status, COUNT(*), SUM(amount) FROM orders_cdc GROUP BY status;
Flink SQL + CDC = Live Database Replica in Kafka Connect Debezium → Kafka → Flink SQL. Flink materialises the CDC stream into a continuously updated, queryable state. Write the result to an Iceberg table or upsert-kafka topic. You now have a low-latency (seconds), exactly-once replica of your production database — with arbitrary SQL transformations applied — without ever touching Spark, Hadoop, or a dedicated OLAP database.

Flink CDC Connectors (without Kafka)

Direct CDC — no Kafka requiredsql
-- flink-cdc-connectors: direct DB → Flink, no Kafka broker
CREATE TABLE mysql_orders (
  order_id    BIGINT,
  customer_id BIGINT,
  amount      DECIMAL(18,4),
  status      STRING,
  updated_at  TIMESTAMP,
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector'  = 'mysql-cdc',
  'hostname'   = 'prod-mysql',
  'port'       = '3306',
  'username'   = 'flink_ro',
  'password'   = '...',
  'database-name' = 'ecommerce',
  'table-name' = 'orders'
);
-- Flink reads MySQL binlog directly.
-- Also available: postgres-cdc, oracle-cdc, mongodb-cdc,
--                 sqlserver-cdc, oceanbase-cdc, tidb-cdc
Windowing Deep Dive
WINDOW TYPES
Windows are the mechanism by which Flink aggregates an unbounded stream into finite, computable buckets. Choosing the wrong window type is the most common source of incorrect streaming analytics. Here is every type with its exact semantics.
All window types — DataStream APIjava
// 1. TUMBLING EVENT TIME — fixed, non-overlapping
stream.keyBy(k -> k.id)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .aggregate(new RevenueAgg());
// [00:00,01:00), [01:00,02:00) ... each fires once

// 2. SLIDING — overlapping windows
stream.keyBy(k -> k.id)
    .window(SlidingEventTimeWindows.of(
        Time.minutes(10),   // window size
        Time.minutes(5)))   // slide interval
    .aggregate(new RevenueAgg());
// Each event belongs to size/slide = 2 windows.
// [00:00,00:10), [00:05,00:15), [00:10,00:20) ...

// 3. SESSION — activity gap based, per key
stream.keyBy(k -> k.userId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(new SessionAgg());
// Window extends as long as events keep arriving within 30min.
// No events for 30min → window closes → session emitted.

// 4. GLOBAL WINDOW — one window per key, forever
stream.keyBy(k -> k.id)
    .window(GlobalWindows.create())
    .trigger(new CountTrigger(100))    // fire every 100 events
    .aggregate(new RevenueAgg());
// Global window never closes without a custom trigger.
// Use for: arbitrary triggers (ML model inference every N records)

// 5. CUSTOM TRIGGER — fire on condition
stream.keyBy(k -> k.customerId)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .trigger(PurgingTrigger.of(CountTrigger.of(50)))
    // Fire AND PURGE when 50 events accumulate within the window.
    // Fire again at window end. Useful for early/speculative results.
    .aggregate(new RevenueAgg());

AggregateFunction vs ProcessWindowFunction

Incremental vs full-window — performance tradeoffjava
/**
 * AggregateFunction: INCREMENTAL — O(1) state per window.
 * Accumulator updated per event. Efficient but no access
 * to window boundaries or all events at once.
 */
public class RevenueAgg
        implements AggregateFunction<Order, Double, Revenue> {

    public Double createAccumulator() { return 0.0; }
    public Double add(Order o, Double acc) { return acc + o.amount; }
    public Double merge(Double a, Double b) { return a + b; }
    public Revenue getResult(Double acc) {
        return new Revenue(acc);
    }
}

/**
 * ProcessWindowFunction: FULL — buffers ALL events in window.
 * Expensive (O(n) state) but gives access to window bounds,
 * all events, and iteration. Use for: percentiles, medians,
 * complex multi-pass calculations.
 */
public class P99Calculator
        extends ProcessWindowFunction<Event, Metrics, String, TimeWindow> {

    public void process(String key,
                         Context ctx,
                         Iterable<Event> events,   // ALL events
                         Collector<Metrics> out) {

        List<Long> latencies = StreamSupport
            .stream(events.spliterator(), false)
            .map(Event::getLatency)
            .sorted()
            .collect(Collectors.toList());

        long p99 = latencies.get((int)(latencies.size() * 0.99));
        out.collect(new Metrics(key,
            ctx.window().getStart(), ctx.window().getEnd(), p99));
    }
}

// BEST PRACTICE: combine both — incremental + window context:
stream.keyBy(...)
    .window(...)
    .aggregate(
        new RevenueAgg(),    // incremental: O(1) state
        new EnrichWithWindowBounds()  // called once on fire
    );
Fault Tolerance
EXACTLY-ONCE IN FLINK
Flink's exactly-once is architecturally different from Kafka's. Kafka uses the transactional API and sequence numbers. Flink uses a two-phase commit protocol layered on top of its checkpointing system — making EOS available to any sink, not just Kafka.

Two-Phase Commit via Checkpoints

How Flink's EOS Differs From Kafka's Kafka EOS uses PID+sequence numbers and transactions purely within the Kafka protocol. Flink's EOS is a generic protocol: any sink that implements TwoPhaseCommitSinkFunction gets exactly-once — whether it's Kafka, a JDBC database, an Iceberg table, or a custom system. The checkpoint acts as the global coordinator.
2PC sequence — Kafka sinkjava
/*
 TWO-PHASE COMMIT SEQUENCE:
 ─────────────────────────────────────────────────

 PHASE 1 — PRE-COMMIT (on checkpoint barrier):
   1. Source operators snapshot their Kafka offsets
   2. Processing operators snapshot their keyed state
   3. Kafka sink begins a Kafka transaction
      (transactional.id = "flink-sink-{subtaskId}-{checkpointId}")
   4. All records written so far go into the open transaction
   5. Sink snapshots: transactional.id into checkpoint state
   6. All operators report checkpoint complete to JobManager

 PHASE 2 — COMMIT (after all operators confirm):
   1. JobManager receives ALL confirmements → checkpoint complete
   2. Kafka sink commits the open transaction (commit_transaction())
   3. Records NOW visible to read_committed consumers

 FAILURE SCENARIOS:
   ● Crash before phase 2: transaction aborted, no duplicates
     → restart from last complete checkpoint, reprocess from offsets
   ● Crash mid phase 2: transactional.id persisted in checkpoint
     → on restart, sink recovers transactional.id, aborts/commits
     → idempotent: committing an already-committed transaction is a no-op
*/

KafkaSink<Revenue> kafkaSink = KafkaSink.<Revenue>builder()
    .setBootstrapServers("broker:9092")
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("hourly-revenue-sink")
    // transactionTimeout must be > checkpoint interval
    .setProperty("transaction.timeout.ms", "900000")  // 15min
    .setRecordSerializer(...)
    .build();

Custom TwoPhaseCommitSinkFunction

Custom EOS sink — JDBC examplejava
/**
 * Implement TwoPhaseCommitSinkFunction for ANY external system.
 * Flink calls these methods at the right checkpoint phase.
 * Transaction state is checkpointed — survives restarts.
 */
public class JdbcEosSink
        extends TwoPhaseCommitSinkFunction<Revenue, Connection, Void> {

    @Override
    protected Connection beginTransaction() throws Exception {
        Connection conn = dataSource.getConnection();
        conn.setAutoCommit(false);    // start DB transaction
        return conn;
    }

    @Override
    public void invoke(Connection txn, Revenue r, Context ctx) {
        // Write into the open transaction
        try(PreparedStatement ps = txn.prepareStatement("UPSERT INTO...")){
            ps.setString(1, r.customerId);
            ps.setDouble(2, r.amount);
            ps.executeUpdate();
        }
    }

    @Override
    protected void preCommit(Connection txn) {
        // Phase 1: called when checkpoint barrier arrives
        // Flush any pending writes — do NOT commit yet
    }

    @Override
    protected void commit(Connection txn) {
        txn.commit();    // Phase 2: ALL operators confirmed
        txn.close();
    }

    @Override
    protected void abort(Connection txn) {
        txn.rollback();  // checkpoint failed → rollback
        txn.close();
    }
}
EOS to ANY System This is Flink's killer architectural advantage over Kafka Streams. Kafka Streams EOS only works Kafka→Kafka. Flink's 2PC protocol gives you exactly-once to: PostgreSQL, MySQL, Iceberg, Delta Lake, Elasticsearch, Redis — anything that supports transactions or idempotent writes. The checkpoint is the coordinator; the sink implements the protocol.
Architecture Decisions
FLINK vs KAFKA STREAMS
These are not competitors — they are complements. But understanding their exact capability differences prevents catastrophic architecture decisions. Here is the definitive, honest comparison.
Capability
Kafka Streams
Deployment model
Embedded library — runs inside your JVM app
Sources beyond Kafka
Kafka only. Cannot read from files, JDBC, S3 directly
Event time / watermarks
Limited: StreamTime advances with message timestamps, no grace period API
State size
GB-scale with RocksDB, but backed by changelog topics (slower restore)
Checkpointing / recovery
Offset commits + changelog topics — minutes for large state
SQL layer
ksqlDB (separate process): limited temporal joins, no CDC aggregation
Exactly-once sinks
Kafka → Kafka only
Session windows
Not supported without custom processor implementation
Batch + stream unified
Streaming only
Python API
JVM only (no official Python client)
Operations overhead
Low: no separate cluster, scales with your app replicas
Latency
Single-digit milliseconds (commit.interval.ms lower bound)
When to use
Simple Kafka→transform→Kafka pipelines, microservice event processing, no-ops team, low operational overhead

The Canonical Combined Architecture

Kafka and Flink are designed to work together. Kafka is the durable distributed log — the backbone of your event infrastructure. Flink is the computation engine that reads from Kafka, processes with stateful operators, and writes results back to Kafka or downstream sinks.

Postgres CDC
Kafka Connect
Kafka Topics
↓ Kafka as durable buffer
APACHE FLINK JOB
Watermarks · State · Windows · Exactly-Once
↓ enriched / aggregated results
Kafka (results)
Iceberg / Delta
PostgreSQL
Elasticsearch

Decision Framework

When to use whichdecision tree
Use KAFKA STREAMS when:
  ✓ Source is exclusively Kafka topics
  ✓ Team is 1-3 engineers without dedicated ops
  ✓ State is < 10GB per application instance
  ✓ No complex temporal joins needed
  ✓ Java/Scala shop (no Python requirement)
  ✓ Microservice per-team ownership model
  ✓ < 1M msgs/sec throughput

Use FLINK when:
  ✓ Need to join Kafka with S3, JDBC, or files
  ✓ State > 10GB (RocksDB incremental checkpoints)
  ✓ Need session windows (Kafka Streams: ❌)
  ✓ Need SQL analytics on streaming data
  ✓ EOS to non-Kafka sinks (JDBC, Iceberg)
  ✓ Backfill / historical replay with correct time
  ✓ Python team / PyFlink + Table SQL API
  ✓ Multi-tenant, shared infrastructure
  ✓ Event-time correctness is non-negotiable

Use BOTH together:
  ✓ Kafka as the durable event backbone
  ✓ Kafka Streams for simple microservice transforms
  ✓ Flink for complex aggregations, ML, lake writes
  ✓ Both write results back to Kafka topics
  ← This is the production-grade architecture

vs DuckDB + Polars (your stack)

The Boundary: Streaming vs Batch Flink processes events as they arrive — sub-second latency, stateful, continuous. DuckDB and Polars process files at rest — batch, analytical, high-throughput. The production pattern: Flink writes results to S3 Parquet in micro-batches (5–60 minutes). DuckDB or Polars queries those Parquet files for ad-hoc analytics. Kafka → Flink → S3 → DuckDB is the exact stack you've been building.