How to Build a Flink Pipeline
A Flink pipeline reads from a source (Kafka, Kinesis, files), applies transformations (keyBy, window, aggregate, process), and writes to a sink (Kafka, Postgres, Elasticsearch). This guide walks through all five steps from local setup to production checkpointing.
Start Flink and Kafka with Docker
# docker-compose.yml
services:
jobmanager:
image: flink:1.18-scala_2.12
command: jobmanager
ports: ["8081:8081"]
taskmanager:
image: flink:1.18-scala_2.12
command: taskmanager
depends_on: [jobmanager]
kafka:
image: confluentinc/cp-kafka:7.5.0
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
docker compose up -d
# Flink UI: http://localhost:8081
Define the DataStream source with watermarks
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.time import Duration
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# Kafka source with bounded-out-of-orderness watermarks
source = KafkaSource.builder()\
.set_bootstrap_servers("kafka:9092")\
.set_topics("transactions")\
.set_value_only_deserializer(TransactionSchema())\
.build()
stream = env.from_source(
source=source,
watermark_strategy=WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(5))
.with_timestamp_assigner(TransactionTimestampAssigner()),
source_name="transactions")
Apply keyBy + tumbling window + aggregate
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time
windowed = (
stream
.key_by(lambda t: t.customer_id) # partition by customer
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(TxVolumeAggregate()) # sum amounts
)
Add stateful fraud detection with ValueState
from pyflink.datastream import KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor
class FraudDetector(KeyedProcessFunction):
def open(self, ctx):
descriptor = ValueStateDescriptor("last_tx_amount", Types.FLOAT())
self.last_amount = ctx.get_state(descriptor)
def process_element(self, tx, ctx, out):
prev = self.last_amount.value()
if prev and tx.amount > prev * 3:
out.collect(FraudAlert(tx.customer_id, tx.amount))
self.last_amount.update(tx.amount)
alerts = stream.key_by(lambda t: t.customer_id)\
.process(FraudDetector())
Enable checkpointing and submit the job
from pyflink.datastream import CheckpointingMode
# Checkpoint every 30 seconds to S3
env.enable_checkpointing(30_000) # ms
env.get_checkpoint_config().set_checkpointing_mode(
CheckpointingMode.EXACTLY_ONCE
)
env.get_checkpoint_config().set_checkpoint_storage(
"s3://my-bucket/flink-checkpoints")
# Write fraud alerts to Kafka sink
alerts.sink_to(kafka_alerts_sink)
env.execute("fraud-detection-pipeline")
# Submit to cluster via CLI:
flink run -py fraud_detection.py
What's Happening
The pipeline reads transaction events from Kafka and assigns event-time watermarks to handle late-arriving messages. Events are partitioned by customer ID (keyBy), then windowed into 1-minute tumbling buckets. Stateful fraud detection tracks each customer's last transaction amount in ValueState and emits an alert when the current amount is 3× the previous. Checkpoints snapshot all state to S3 every 30 seconds — if the job crashes, it restores from the last checkpoint with exactly-once guarantees.
When to Use This Pattern
- • Real-time fraud or anomaly detection on transaction streams
- • Per-user behavioral tracking (session counts, velocity checks)
- • Continuous aggregations feeding live dashboards
- • Event-driven microservice triggers from Kafka events
Common Issues
Job fails with "No watermark has been assigned"
You forgot to configure a WatermarkStrategy on the source. Add .for_bounded_out_of_orderness(Duration.of_seconds(5)) and implement a TimestampAssigner that extracts the event timestamp from your data.
State grows unbounded over time
You did not set a TTL on your state descriptor. Use StateTtlConfig to expire state for idle keys. Without TTL, ValueState for every customer ID you have ever seen stays in memory forever.
Checkpoints timeout or fail constantly
Your checkpoint interval is shorter than the time it takes to complete a checkpoint. Increase the interval, reduce operator state size, or switch to incremental checkpointing (RocksDB backend supports this).
FAQ
- What language should I use to write a Flink pipeline?
- Java or Scala for production — full API coverage and best performance. PyFlink for Python teams, but some APIs are not yet available. This guide uses PyFlink for readability.
- How do I run a Flink job locally?
- Start a local Flink cluster with Docker Compose (jobmanager + taskmanager), then run python your_job.py or flink run -py your_job.py to submit it.
- How does Flink checkpointing work?
- Flink periodically snapshots all operator state to S3/HDFS using the Chandy-Lamport algorithm. On failure it restores from the last checkpoint, replaying any events that arrived after. This gives exactly-once guarantees.