A Flink program compiles to a Logical Dataflow Graph (operators + streams), then to a Physical Execution Graph (parallel subtasks). Network buffers connect subtasks — data flows through them in a push model, not a pull-loop like Kafka consumers.
/* LOGICAL GRAPH (your code): Source ──→ Map ──→ KeyBy ──→ Window ──→ Sink PHYSICAL GRAPH (parallelism=3, what actually runs): Source[0] ──→ Map[0] ──→\ Source[1] ──→ Map[1] ──→ KeyBy/shuffle ──→ Window[0] ──→ Sink[0] Source[2] ──→ Map[2] ──→/ ──→ Window[1] ──→ Sink[1] ──→ Window[2] ──→ Sink[2] NETWORK SHUFFLE: KeyBy routes each record to exactly one downstream subtask based on hash(key) % parallelism. This is the only place data crosses thread/network boundaries. FORWARD CHANNELS: Source→Map (same parallelism) use direct buffer handoff — zero serialization, zero network copy. */ // Configure execution mode: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // global default env.disableOperatorChaining(); // force separate tasks (debugging) env.setBufferTimeout(100); // flush network buffers every 100ms // lower = latency, higher = throughput // Per-operator parallelism override: stream .map(fn).setParallelism(8) // this operator uses 8 subtasks .keyBy(key) // shuffle happens here .window(...) .sum("amount").setParallelism(4);
| Mode | JobManager | Use Case |
|---|---|---|
| Application | Runs inside cluster, 1 JM per job | Production — strong isolation |
| Per-Job | Created on submit, destroyed on exit | YARN/K8s short-lived jobs |
| Session | Long-lived, shared by many jobs | Dev, low-latency deploys |
| Local | Embedded in main() process | Testing, IDE debugging |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // ── SOURCE ────────────────────────────────────── KafkaSource<Order> source = KafkaSource.<Order>builder() .setBootstrapServers("broker:9092") .setTopics("orders") .setGroupId("flink-order-processor") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new OrderDeserializer()) .build(); DataStream<Order> orders = env.fromSource( source, WatermarkStrategy .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((e, ts) -> e.getEventTs()), "Kafka Orders Source" ); // ── TRANSFORMATIONS ───────────────────────────── DataStream<RevenueMetric> revenue = orders .filter(o -> o.getStatus() == Status.COMPLETED) // parallel filter .map(o -> new EnrichedOrder(o, enrich(o))) // stateless map .keyBy(EnrichedOrder::getCustomerId) // hash shuffle .window(TumblingEventTimeWindows.of(Time.hours(1))) // 1-hour window .aggregate( new RevenueAggregator(), // incremental: O(1) state per window new WindowResultMapper() // called once when window fires ); // ── SINK ───────────────────────────────────────── KafkaSink<RevenueMetric> sink = KafkaSink.<RevenueMetric>builder() .setBootstrapServers("broker:9092") .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("hourly-revenue") .setValueSerializationSchema(new RevenueSerializer()) .build()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 2PC .setTransactionalIdPrefix("revenue-sink") .build(); revenue.sinkTo(sink); env.execute("Hourly Revenue Aggregator");
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode from pyflink.datastream.connectors.kafka import ( KafkaSource, KafkaOffsetsInitializer, KafkaRecordSerializationSchema, KafkaSink ) from pyflink.common import WatermarkStrategy, Duration, Types from pyflink.common.serialization import SimpleStringSchema env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(4) env.enable_checkpointing(60_000) # checkpoint every 60s # Add Kafka connector JARs (needed for PyFlink) env.add_jars("file:///opt/flink/lib/flink-connector-kafka.jar") # ── SOURCE ────────────────────────────────────── source = ( KafkaSource.builder() .set_bootstrap_servers("broker:9092") .set_topics("orders") .set_group_id("flink-py-processor") .set_starting_offsets(KafkaOffsetsInitializer.earliest()) .set_value_only_deserializer(SimpleStringSchema()) .build() ) orders = env.from_source( source, WatermarkStrategy .for_bounded_out_of_orderness(Duration.of_seconds(5)) .with_idleness(Duration.of_minutes(1)), "Kafka Source" ) # ── TRANSFORMATIONS ───────────────────────────── import json def parse_order(raw_json: str) -> dict: d = json.loads(raw_json) return (d['customer_id'], d['amount']) def is_valid(record) -> bool: return record[1] > 0 parsed = ( orders .map(parse_order, output_type=Types.TUPLE([Types.STRING(), Types.FLOAT()])) .filter(is_valid) .key_by(lambda x: x[0]) # key by customer_id ) # ── STATEFUL REDUCE (sum per customer, unbounded) ── result = parsed.reduce(lambda a, b: (a[0], a[1] + b[1])) result.print() env.execute("PyFlink Order Processor")
from pyflink.table import EnvironmentSettings, TableEnvironment from pyflink.table.expressions import col, lit settings = EnvironmentSettings.in_streaming_mode() t_env = TableEnvironment.create(settings) # Define source with DDL (connector resolved at runtime) t_env.execute_sql(""" CREATE TABLE orders ( order_id STRING, customer_id STRING, amount DOUBLE, event_ts TIMESTAMP(3), WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = 'broker:9092', 'format' = 'json' ) """) # Table API: compiled to Java operators, zero Python overhead orders_table = t_env.from_path("orders") result = ( orders_table .filter(col("amount") > 0) .group_by(col("customer_id")) .select( col("customer_id"), col("amount").sum.alias("total_spend") ) ) result.execute().print()
/** * ProcessFunction: the most powerful Flink primitive. * Gives you: * - Access to keyed state (per-key values, lists, maps) * - Event-time and processing-time timers * - Side outputs (route to different streams) * - Full context: timestamp, key, current watermark */ public class SessionDetector extends KeyedProcessFunction<String, Event, Session> { // Declared state — Flink manages lifecycle across checkpoints private ValueState<Long> sessionStart; private ValueState<Long> lastEventTs; private ValueState<Integer> eventCount; private ValueState<Long> timerTs; // pending timer private static final long GAP = 30 * 60_000; // 30min inactivity @Override public void open(OpenContext ctx) { sessionStart = getRuntimeContext().getState( new ValueStateDescriptor<>("session-start", Long.class)); lastEventTs = getRuntimeContext().getState( new ValueStateDescriptor<>("last-event", Long.class)); eventCount = getRuntimeContext().getState( new ValueStateDescriptor<>("event-count", Integer.class)); timerTs = getRuntimeContext().getState( new ValueStateDescriptor<>("timer-ts", Long.class)); } @Override public void processElement(Event e, Context ctx, Collector<Session> out) { Long start = sessionStart.value(); if (start == null) { sessionStart.update(e.ts); // start of new session eventCount.update(0); } eventCount.update(eventCount.value() + 1); lastEventTs.update(e.ts); // Cancel old timer, register new one (30min from NOW) Long oldTimer = timerTs.value(); if (oldTimer != null) ctx.timerService().deleteEventTimeTimer(oldTimer); long newTimer = e.ts + GAP; ctx.timerService().registerEventTimeTimer(newTimer); timerTs.update(newTimer); } @Override public void onTimer(long ts, OnTimerContext ctx, Collector<Session> out) { // Timer fires → 30min inactivity → emit session out.collect(new Session( ctx.getCurrentKey(), sessionStart.value(), lastEventTs.value(), eventCount.value() )); // Clear state — this key starts fresh sessionStart.clear(); lastEventTs.clear(); eventCount.clear(); timerTs.clear(); } }
// Never use blocking DB calls in processElement()! // They block the entire slot thread. // Use AsyncFunction for external enrichment. public class CustomerEnricher extends RichAsyncFunction<Order, EnrichedOrder> { private transient AsyncHttpClient client; @Override public void open(Configuration params) { client = Dsl.asyncHttpClient(); // non-blocking HTTP } @Override public void asyncInvoke(Order order, ResultFuture<EnrichedOrder> rf) { client.prepareGet("/customers/" + order.customerId) .execute(new AsyncCompletionHandler<>() { public Response onCompleted(Response resp) { rf.complete(Collections.singletonList( new EnrichedOrder(order, parseCustomer(resp)) )); return resp; } }); } } // Wire it into the pipeline: DataStream<EnrichedOrder> enriched = AsyncDataStream.unorderedWait( orders, new CustomerEnricher(), 1000, TimeUnit.MILLISECONDS, // timeout 100 // max concurrent requests );
/** * Side outputs split a stream into multiple tagged sub-streams * based on record content — without forking or filtering. * Zero overhead: routing decision made inline, no copy. */ OutputTag<Order> largeOrders = new OutputTag<>("large-orders"){}; OutputTag<Order> lateEvents = new OutputTag<>("late-events"){}; OutputTag<String> parseErrors = new OutputTag<>("parse-errors"){}; SingleOutputStreamOperator<Order> mainStream = rawInput .process(new ProcessFunction<String, Order>() { public void processElement(String raw, Context ctx, Collector<Order> out) { try { Order o = parse(raw); if (o.getAmount() > 10_000) ctx.output(largeOrders, o); // → fraud review if (ctx.timestamp() < ctx.timerService().currentWatermark()) ctx.output(lateEvents, o); // → late data audit out.collect(o); // always goes to main stream too } catch (ParseException e) { ctx.output(parseErrors, raw); // → DLQ topic } } }); // Each is a fully independent DataStream: DataStream<Order> fraudReview = mainStream.getSideOutput(largeOrders); DataStream<Order> lateAudit = mainStream.getSideOutput(lateEvents); DataStream<String> deadLetters = mainStream.getSideOutput(parseErrors); // Route to different Kafka topics: fraudReview.sinkTo(fraudTopic); lateAudit.sinkTo(auditTopic); deadLetters.sinkTo(dlqTopic);
// connect() joins TWO DIFFERENT TYPES in one operator. // Unlike union() (same type) or join() (keyed window), // connect() lets each stream handle its own logic with shared state. DataStream<Order> orders = ...; DataStream<Config> configs = ...; // dynamic config changes BroadcastStream<Config> bcConfig = configs.broadcast(configStateDescriptor); orders.connect(bcConfig) .process(new BroadcastProcessFunction<Order, Config, Result>() { public void processElement(Order order, ReadOnlyContext ctx, ...) { Config cfg = ctx.getBroadcastState(configStateDescriptor) .get("current"); // Apply latest config to every order — no restart needed out.collect(applyConfig(order, cfg)); } public void processBroadcastElement(Config cfg, ...) { ctx.getBroadcastState(configStateDescriptor).put("current", cfg); } });
| Domain | Clock Source | Replays Correctly? | Use For |
|---|---|---|---|
| Event Time | Timestamp in the record itself | Yes ✓ | All business-critical windows |
| Processing Time | System wall clock on TaskManager | No ✗ | Approximate monitoring only |
| Ingestion Time | Timestamp assigned at source | Partial | Backpressure detection |
// 1. Bounded out-of-orderness (most common production choice) WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((e, ts) -> e.getEventTs()) // Watermark = max(seen_event_ts) - 5s // "I'll wait 5 seconds for stragglers, then fire windows" // 2. Monotonically increasing (data is perfectly ordered) WatermarkStrategy .<Event>forMonotonousTimestamps() .withTimestampAssigner((e, ts) -> e.getEventTs()) // Watermark = max(seen_ts) - 1ms. Zero delay. Use only if certain. // 3. Custom — per-partition watermarks (Kafka-native) WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(30)) .withTimestampAssigner(...) .withIdleness(Duration.ofMinutes(5)) // withIdleness: if a Kafka partition has no events for 5min, // don't let it block the global watermark from advancing. // Critical for low-volume topics with sparse partitions. // 4. Custom WatermarkGenerator (full control) public class OrderedEventTimeGenerator implements WatermarkGenerator<Event> { private long maxTs = Long.MIN_VALUE; public void onEvent(Event e, long ts, WatermarkOutput out) { maxTs = Math.max(maxTs, e.getTs()); } public void onPeriodicEmit(WatermarkOutput out) { out.emitWatermark(new Watermark(maxTs - 5000)); } }
OutputTag<Order> lateTag = new OutputTag<>("late-orders"){}; SingleOutputStreamOperator<Revenue> result = orders .keyBy(o -> o.customerId) .window(TumblingEventTimeWindows.of(Time.hours(1))) // Strategy 1: Allow late events to re-fire the window // Window fires at watermark, then AGAIN for each late event // for up to 'allowedLateness' past the window close .allowedLateness(Time.minutes(5)) // Strategy 2: Route very-late events to a side output // (events arriving after allowedLateness period) .sideOutputLateData(lateTag) .aggregate(new RevenueAgg()); // Strategy 3: Process side output separately (correction stream) DataStream<Order> lateOrders = result.getSideOutput(lateTag); lateOrders.sinkTo(lateOrdersAuditTopic); /* TIMELINE: t=10:00:00 window [09:00, 10:00) opens t=10:00:05 watermark passes 10:00 → window FIRES (main output) t=10:02:00 late event arrives (ts=09:58) → allowedLateness=5min still within window → fires AGAIN with updated result t=10:05:01 another late event (ts=09:45) → past allowedLateness routed to lateTag side output */
// ALL keyed state is scoped to: (operator, key, namespace) // Flink manages serialization, checkpointing, and TTL. // 1. ValueState — single value per key ValueState<Long> count = ctx.getState( new ValueStateDescriptor<>("event-count", Long.class)); count.update(count.value() + 1); Long v = count.value(); count.clear(); // 2. ListState — append-only list per key ListState<Event> buffer = ctx.getListState( new ListStateDescriptor<>("event-buffer", Event.class)); buffer.add(event); Iterable<Event> all = buffer.get(); buffer.update(newList); // replace all // 3. MapState — key-value map per Flink-key MapState<String, Double> totals = ctx.getMapState( new MapStateDescriptor<>("category-totals", String.class, Double.class)); totals.put("electronics", totals.get("electronics") + amount); // 4. ReducingState — auto-reduces on add() ReducingState<Long> sum = ctx.getReducingState( new ReducingStateDescriptor<>("sum", Long::sum, Long.class)); sum.add(42L); // calls (current, 42L) → Long::sum sum.get(); // current aggregate // 5. AggregatingState — complex incremental aggregation AggregatingState<Order, Stats> statsState = ctx.getAggregatingState( new AggregatingStateDescriptor<>("stats", new StatsAggregator(), Stats.class)); // 6. State TTL — automatic expiry (prevents unbounded growth) StateTtlConfig ttl = StateTtlConfig .newBuilder(Time.days(7)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .cleanupInRocksdbCompactFilter(1000) // compact during RocksDB flush .build(); valueStateDescriptor.enableTimeToLive(ttl);
| Backend | State Location | Checkpoint | Best For |
|---|---|---|---|
| HashMapStateBackend | JVM Heap (objects) | Async snapshot to FS | Small state, fast access |
| EmbeddedRocksDB | Off-heap (SSD/NVMe) | Incremental to S3/HDFS | Large state, TB-scale |
Flink's checkpoint algorithm injects checkpoint barriers — special markers — into the data stream alongside regular events. When an operator receives barriers from all upstream partitions, it snapshots its state and propagates the barrier downstream. No data flow is paused.
CheckpointConfig cfg = env.getCheckpointConfig(); cfg.setCheckpointInterval(60_000); // every 60s cfg.setCheckpointTimeout(120_000); // fail if > 2min cfg.setMinPauseBetweenCheckpoints(30_000); // cooldown cfg.setMaxConcurrentCheckpoints(1); // 1 in-flight at a time cfg.setExternalizedCheckpointCleanup( // keep on cancel CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // Incremental checkpoints (RocksDB only — huge for large state): env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = incremental. Only changed SSTable files uploaded. // Full checkpoint: 100GB → 30min. Incremental: 100GB → 90s. // Savepoint: operator-initiated, used for upgrades // $ flink savepoint <jobId> s3://bucket/savepoints/ // $ flink run -s s3://bucket/savepoints/sp-123 job.jar // Savepoints survive code changes (with compatible state schema)
-- Kafka source with Avro schema registry CREATE TABLE orders ( order_id STRING, customer_id STRING, amount DECIMAL(18, 4), status STRING, event_ts TIMESTAMP(3), -- Watermark: allow 5s of out-of-order delivery WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = 'broker:9092', 'properties.group.id' = 'flink-sql-group', 'scan.startup.mode' = 'earliest-offset', 'format' = 'avro-confluent', 'avro-confluent.url' = 'http://schema-registry:8081' ); -- Parquet source (S3 data lake) CREATE TABLE customer_profiles ( customer_id STRING, tier STRING, country STRING, updated_at TIMESTAMP(3) ) WITH ( 'connector' = 'filesystem', 'path' = 's3://lake/customer-profiles/', 'format' = 'parquet', 'source.monitor-interval' = '60s' -- poll for new files ); -- JDBC lookup (for enrichment with temporal join) CREATE TABLE exchange_rates ( currency STRING, rate DOUBLE, valid_from TIMESTAMP(3), PRIMARY KEY (currency) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://db:5432/rates', 'table-name' = 'exchange_rates', 'lookup.cache.max-rows' = '10000', -- in-memory cache 'lookup.cache.ttl' = '60s' ); -- Iceberg sink (write to data lake with ACID) CREATE TABLE hourly_revenue_iceberg ( window_start TIMESTAMP(3), customer_id STRING, total_revenue DECIMAL(18,4), PRIMARY KEY (window_start, customer_id) NOT ENFORCED ) WITH ( 'connector' = 'iceberg', 'catalog-name'= 'glue', 'warehouse' = 's3://lake/iceberg/', 'write.format.default' = 'parquet' );
-- Simple continuous filter — runs forever INSERT INTO high_value_orders SELECT * FROM orders WHERE amount > 1000 AND status = 'COMPLETED'; -- Lookup join with JDBC (adds async I/O under the hood) SELECT o.order_id, o.amount, c.tier, c.country FROM orders o JOIN customer_profiles FOR SYSTEM_TIME AS OF o.event_ts c ON o.customer_id = c.customer_id; -- FOR SYSTEM_TIME AS OF: temporal join. -- Looks up the customer profile valid AT the event's timestamp. -- This is "as-of" join semantics — identical to DuckDB's ASOF JOIN. -- Group by with deduplication SELECT customer_id, count(DISTINCT session_id) AS sessions FROM orders GROUP BY customer_id;
-- TUMBLE: fixed non-overlapping windows SELECT window_start, window_end, customer_id, SUM(amount) AS hourly_revenue, COUNT(*) AS order_count, COUNT(DISTINCT session_id) AS sessions FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR(event_ts), INTERVAL '1' HOUR) ) GROUP BY window_start, window_end, customer_id; -- HOP (sliding): 10-minute windows advancing every 5 minutes SELECT window_start, window_end, customer_id, SUM(amount) FROM TABLE( HOP(TABLE orders, DESCRIPTOR(event_ts), INTERVAL '5' MINUTE, -- hop (slide) interval INTERVAL '10' MINUTE) -- window size ) GROUP BY window_start, window_end, customer_id; -- SESSION: close window after 30min inactivity per key SELECT window_start, window_end, customer_id, COUNT(*) AS events_in_session FROM TABLE( SESSION(TABLE orders PARTITION BY customer_id, DESCRIPTOR(event_ts), INTERVAL '30' MINUTE) ) GROUP BY window_start, window_end, customer_id; -- CUMULATE: growing windows that reset at fixed boundaries -- Perfect for: real-time "revenue so far today" metrics SELECT window_start, -- start of day (reset boundary) window_end, -- current accumulation point customer_id, SUM(amount) AS revenue_so_far_today FROM TABLE( CUMULATE(TABLE orders, DESCRIPTOR(event_ts), INTERVAL '1' HOUR, -- step: emit every hour INTERVAL '1' DAY) -- max size: reset daily ) GROUP BY window_start, window_end, customer_id;
| API | Status | Syntax |
|---|---|---|
| Window TVF | Current (1.13+) | TABLE(TUMBLE(TABLE t, ...)) |
| GroupWindow | Deprecated | GROUP BY TUMBLE(ts, INTERVAL...) |
Window TVF enables Window TopN, Window Deduplication, and Window Join — capabilities impossible with the old GroupWindow API.
/* TEMPORAL TABLE JOIN (versioned): for each order, find the exchange rate that was valid AT the order's event time. This requires the rate table to be versioned (append-only CDC). */ CREATE TABLE exchange_rates_versioned ( currency STRING, rate DOUBLE, valid_from TIMESTAMP(3), PRIMARY KEY (currency) NOT ENFORCED, WATERMARK FOR valid_from AS valid_from - INTERVAL '1' SECOND ) WITH (...); -- backed by Kafka topic (append-only) -- Temporal join: look up rate valid AT order time -- Flink maintains a versioned state of the rate table. SELECT o.order_id, o.currency, o.amount, o.amount * r.rate AS amount_usd, r.valid_from AS rate_valid_from FROM orders o JOIN exchange_rates_versioned FOR SYSTEM_TIME AS OF o.event_ts r ON o.currency = r.currency; /* INTERVAL JOIN: join two streams where the timestamps must be within a time distance of each other. Use for: matching orders with their payments, correlating click events with subsequent purchases. */ SELECT o.order_id, p.payment_id, p.amount AS paid_amount, o.amount AS order_amount, p.event_ts - o.event_ts AS payment_delay FROM orders o, payments p WHERE o.order_id = p.order_id AND p.event_ts BETWEEN o.event_ts AND o.event_ts + INTERVAL '24' HOUR; -- Flink buffers orders for 24h, waiting for matching payments. -- Records older than 24h are cleared from state automatically.
-- DEDUPLICATION: keep only the first event per key -- (solves at-least-once source duplicates) SELECT order_id, customer_id, amount, event_ts FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY order_id ORDER BY event_ts ASC ) AS rn FROM orders ) WHERE rn = 1; -- WINDOW TOP-N: top 3 products by revenue per hour SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY window_start, window_end ORDER BY total_revenue DESC ) AS rank FROM hourly_product_revenue -- windowed result from TUMBLE ) WHERE rank <= 3;
/* Flink SQL understands CHANGELOG SEMANTICS. A CDC stream from Debezium contains +I (insert), -U/-D (retract), and +U (update) messages. Flink treats these as: - INSERT: new row enters the aggregation - UPDATE_BEFORE: retract old value from aggregation - UPDATE_AFTER: add new value to aggregation - DELETE: retract row from aggregation This means you can GROUP BY and aggregate a CDC stream and the aggregate UPDATES as rows change in the source database. Kafka ksqlDB cannot do this. */ -- CDC source (Debezium Kafka output → Flink CDC table) CREATE TABLE orders_cdc ( order_id STRING, customer_id STRING, amount DECIMAL(18,4), status STRING, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'ecommerce-prod.public.orders', -- Debezium topic 'format' = 'debezium-json', -- understands CDC envelope 'properties.bootstrap.servers' = 'broker:9092' ); -- Materialise a live sum — updates as orders change status: SELECT status, COUNT(*) AS order_count, SUM(amount) AS total_amount FROM orders_cdc GROUP BY status; -- When an order changes from PENDING → COMPLETED: -- -U: subtract from PENDING count/sum -- +U: add to COMPLETED count/sum -- Result is a continuously updating table. -- Write CDC aggregation to UPSERT Kafka topic: CREATE TABLE order_status_summary_kafka ( status STRING, order_count BIGINT, total_amount DECIMAL(18,4), PRIMARY KEY (status) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', -- key-based compaction 'topic' = 'order-status-summary', 'properties.bootstrap.servers' = 'broker:9092', 'key.format' = 'json', 'value.format'= 'json' ); INSERT INTO order_status_summary_kafka SELECT status, COUNT(*), SUM(amount) FROM orders_cdc GROUP BY status;
-- flink-cdc-connectors: direct DB → Flink, no Kafka broker CREATE TABLE mysql_orders ( order_id BIGINT, customer_id BIGINT, amount DECIMAL(18,4), status STRING, updated_at TIMESTAMP, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'prod-mysql', 'port' = '3306', 'username' = 'flink_ro', 'password' = '...', 'database-name' = 'ecommerce', 'table-name' = 'orders' ); -- Flink reads MySQL binlog directly. -- Also available: postgres-cdc, oracle-cdc, mongodb-cdc, -- sqlserver-cdc, oceanbase-cdc, tidb-cdc
// 1. TUMBLING EVENT TIME — fixed, non-overlapping stream.keyBy(k -> k.id) .window(TumblingEventTimeWindows.of(Time.hours(1))) .aggregate(new RevenueAgg()); // [00:00,01:00), [01:00,02:00) ... each fires once // 2. SLIDING — overlapping windows stream.keyBy(k -> k.id) .window(SlidingEventTimeWindows.of( Time.minutes(10), // window size Time.minutes(5))) // slide interval .aggregate(new RevenueAgg()); // Each event belongs to size/slide = 2 windows. // [00:00,00:10), [00:05,00:15), [00:10,00:20) ... // 3. SESSION — activity gap based, per key stream.keyBy(k -> k.userId) .window(EventTimeSessionWindows.withGap(Time.minutes(30))) .aggregate(new SessionAgg()); // Window extends as long as events keep arriving within 30min. // No events for 30min → window closes → session emitted. // 4. GLOBAL WINDOW — one window per key, forever stream.keyBy(k -> k.id) .window(GlobalWindows.create()) .trigger(new CountTrigger(100)) // fire every 100 events .aggregate(new RevenueAgg()); // Global window never closes without a custom trigger. // Use for: arbitrary triggers (ML model inference every N records) // 5. CUSTOM TRIGGER — fire on condition stream.keyBy(k -> k.customerId) .window(TumblingEventTimeWindows.of(Time.hours(1))) .trigger(PurgingTrigger.of(CountTrigger.of(50))) // Fire AND PURGE when 50 events accumulate within the window. // Fire again at window end. Useful for early/speculative results. .aggregate(new RevenueAgg());
/** * AggregateFunction: INCREMENTAL — O(1) state per window. * Accumulator updated per event. Efficient but no access * to window boundaries or all events at once. */ public class RevenueAgg implements AggregateFunction<Order, Double, Revenue> { public Double createAccumulator() { return 0.0; } public Double add(Order o, Double acc) { return acc + o.amount; } public Double merge(Double a, Double b) { return a + b; } public Revenue getResult(Double acc) { return new Revenue(acc); } } /** * ProcessWindowFunction: FULL — buffers ALL events in window. * Expensive (O(n) state) but gives access to window bounds, * all events, and iteration. Use for: percentiles, medians, * complex multi-pass calculations. */ public class P99Calculator extends ProcessWindowFunction<Event, Metrics, String, TimeWindow> { public void process(String key, Context ctx, Iterable<Event> events, // ALL events Collector<Metrics> out) { List<Long> latencies = StreamSupport .stream(events.spliterator(), false) .map(Event::getLatency) .sorted() .collect(Collectors.toList()); long p99 = latencies.get((int)(latencies.size() * 0.99)); out.collect(new Metrics(key, ctx.window().getStart(), ctx.window().getEnd(), p99)); } } // BEST PRACTICE: combine both — incremental + window context: stream.keyBy(...) .window(...) .aggregate( new RevenueAgg(), // incremental: O(1) state new EnrichWithWindowBounds() // called once on fire );
TwoPhaseCommitSinkFunction gets exactly-once — whether it's Kafka, a JDBC database, an Iceberg table, or a custom system. The checkpoint acts as the global coordinator.
/* TWO-PHASE COMMIT SEQUENCE: ───────────────────────────────────────────────── PHASE 1 — PRE-COMMIT (on checkpoint barrier): 1. Source operators snapshot their Kafka offsets 2. Processing operators snapshot their keyed state 3. Kafka sink begins a Kafka transaction (transactional.id = "flink-sink-{subtaskId}-{checkpointId}") 4. All records written so far go into the open transaction 5. Sink snapshots: transactional.id into checkpoint state 6. All operators report checkpoint complete to JobManager PHASE 2 — COMMIT (after all operators confirm): 1. JobManager receives ALL confirmements → checkpoint complete 2. Kafka sink commits the open transaction (commit_transaction()) 3. Records NOW visible to read_committed consumers FAILURE SCENARIOS: ● Crash before phase 2: transaction aborted, no duplicates → restart from last complete checkpoint, reprocess from offsets ● Crash mid phase 2: transactional.id persisted in checkpoint → on restart, sink recovers transactional.id, aborts/commits → idempotent: committing an already-committed transaction is a no-op */ KafkaSink<Revenue> kafkaSink = KafkaSink.<Revenue>builder() .setBootstrapServers("broker:9092") .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("hourly-revenue-sink") // transactionTimeout must be > checkpoint interval .setProperty("transaction.timeout.ms", "900000") // 15min .setRecordSerializer(...) .build();
/** * Implement TwoPhaseCommitSinkFunction for ANY external system. * Flink calls these methods at the right checkpoint phase. * Transaction state is checkpointed — survives restarts. */ public class JdbcEosSink extends TwoPhaseCommitSinkFunction<Revenue, Connection, Void> { @Override protected Connection beginTransaction() throws Exception { Connection conn = dataSource.getConnection(); conn.setAutoCommit(false); // start DB transaction return conn; } @Override public void invoke(Connection txn, Revenue r, Context ctx) { // Write into the open transaction try(PreparedStatement ps = txn.prepareStatement("UPSERT INTO...")){ ps.setString(1, r.customerId); ps.setDouble(2, r.amount); ps.executeUpdate(); } } @Override protected void preCommit(Connection txn) { // Phase 1: called when checkpoint barrier arrives // Flush any pending writes — do NOT commit yet } @Override protected void commit(Connection txn) { txn.commit(); // Phase 2: ALL operators confirmed txn.close(); } @Override protected void abort(Connection txn) { txn.rollback(); // checkpoint failed → rollback txn.close(); } }
Kafka and Flink are designed to work together. Kafka is the durable distributed log — the backbone of your event infrastructure. Flink is the computation engine that reads from Kafka, processes with stateful operators, and writes results back to Kafka or downstream sinks.
Use KAFKA STREAMS when:
✓ Source is exclusively Kafka topics
✓ Team is 1-3 engineers without dedicated ops
✓ State is < 10GB per application instance
✓ No complex temporal joins needed
✓ Java/Scala shop (no Python requirement)
✓ Microservice per-team ownership model
✓ < 1M msgs/sec throughput
Use FLINK when:
✓ Need to join Kafka with S3, JDBC, or files
✓ State > 10GB (RocksDB incremental checkpoints)
✓ Need session windows (Kafka Streams: ❌)
✓ Need SQL analytics on streaming data
✓ EOS to non-Kafka sinks (JDBC, Iceberg)
✓ Backfill / historical replay with correct time
✓ Python team / PyFlink + Table SQL API
✓ Multi-tenant, shared infrastructure
✓ Event-time correctness is non-negotiable
Use BOTH together:
✓ Kafka as the durable event backbone
✓ Kafka Streams for simple microservice transforms
✓ Flink for complex aggregations, ML, lake writes
✓ Both write results back to Kafka topics
← This is the production-grade architecture