Skip to content

How to Build a Flink Pipeline

A Flink pipeline reads from a source (Kafka, Kinesis, files), applies transformations (keyBy, window, aggregate, process), and writes to a sink (Kafka, Postgres, Elasticsearch). This guide walks through all five steps from local setup to production checkpointing.

1

Start Flink and Kafka with Docker

# docker-compose.yml
services:
  jobmanager:
    image: flink:1.18-scala_2.12
    command: jobmanager
    ports: ["8081:8081"]

  taskmanager:
    image: flink:1.18-scala_2.12
    command: taskmanager
    depends_on: [jobmanager]

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
docker compose up -d
# Flink UI: http://localhost:8081
2

Define the DataStream source with watermarks

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.time import Duration

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)

# Kafka source with bounded-out-of-orderness watermarks
source = KafkaSource.builder()\
    .set_bootstrap_servers("kafka:9092")\
    .set_topics("transactions")\
    .set_value_only_deserializer(TransactionSchema())\
    .build()

stream = env.from_source(
    source=source,
    watermark_strategy=WatermarkStrategy
        .for_bounded_out_of_orderness(Duration.of_seconds(5))
        .with_timestamp_assigner(TransactionTimestampAssigner()),
    source_name="transactions")
3

Apply keyBy + tumbling window + aggregate

from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time

windowed = (
    stream
    .key_by(lambda t: t.customer_id)  # partition by customer
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(TxVolumeAggregate())  # sum amounts
)
4

Add stateful fraud detection with ValueState

from pyflink.datastream import KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor

class FraudDetector(KeyedProcessFunction):
    def open(self, ctx):
        descriptor = ValueStateDescriptor("last_tx_amount", Types.FLOAT())
        self.last_amount = ctx.get_state(descriptor)

    def process_element(self, tx, ctx, out):
        prev = self.last_amount.value()
        if prev and tx.amount > prev * 3:
            out.collect(FraudAlert(tx.customer_id, tx.amount))
        self.last_amount.update(tx.amount)

alerts = stream.key_by(lambda t: t.customer_id)\
               .process(FraudDetector())
5

Enable checkpointing and submit the job

from pyflink.datastream import CheckpointingMode

# Checkpoint every 30 seconds to S3
env.enable_checkpointing(30_000)  # ms
env.get_checkpoint_config().set_checkpointing_mode(
    CheckpointingMode.EXACTLY_ONCE
)
env.get_checkpoint_config().set_checkpoint_storage(
    "s3://my-bucket/flink-checkpoints")

# Write fraud alerts to Kafka sink
alerts.sink_to(kafka_alerts_sink)
env.execute("fraud-detection-pipeline")

# Submit to cluster via CLI:
flink run -py fraud_detection.py

What's Happening

The pipeline reads transaction events from Kafka and assigns event-time watermarks to handle late-arriving messages. Events are partitioned by customer ID (keyBy), then windowed into 1-minute tumbling buckets. Stateful fraud detection tracks each customer's last transaction amount in ValueState and emits an alert when the current amount is 3× the previous. Checkpoints snapshot all state to S3 every 30 seconds — if the job crashes, it restores from the last checkpoint with exactly-once guarantees.

When to Use This Pattern

  • • Real-time fraud or anomaly detection on transaction streams
  • • Per-user behavioral tracking (session counts, velocity checks)
  • • Continuous aggregations feeding live dashboards
  • • Event-driven microservice triggers from Kafka events

Common Issues

Job fails with "No watermark has been assigned"

You forgot to configure a WatermarkStrategy on the source. Add .for_bounded_out_of_orderness(Duration.of_seconds(5)) and implement a TimestampAssigner that extracts the event timestamp from your data.

State grows unbounded over time

You did not set a TTL on your state descriptor. Use StateTtlConfig to expire state for idle keys. Without TTL, ValueState for every customer ID you have ever seen stays in memory forever.

Checkpoints timeout or fail constantly

Your checkpoint interval is shorter than the time it takes to complete a checkpoint. Increase the interval, reduce operator state size, or switch to incremental checkpointing (RocksDB backend supports this).

FAQ

What language should I use to write a Flink pipeline?
Java or Scala for production — full API coverage and best performance. PyFlink for Python teams, but some APIs are not yet available. This guide uses PyFlink for readability.
How do I run a Flink job locally?
Start a local Flink cluster with Docker Compose (jobmanager + taskmanager), then run python your_job.py or flink run -py your_job.py to submit it.
How does Flink checkpointing work?
Flink periodically snapshots all operator state to S3/HDFS using the Chandy-Lamport algorithm. On failure it restores from the last checkpoint, replaying any events that arrived after. This gives exactly-once guarantees.

Related

Press Cmd+K to open