I'm building a Kafka -> Flink -> Kafka pipeline that works with delineated "session" data. My input Kafka topic has data in the following format and constitutes one session for session_key
:
start_event(session_key, some_other_data...)
entry_event(session_key, some_other_data...)
entry_event(session_key, some_other_data...)
...
entry_event(session_key, some_other_data...)
end_event(session_key, some_other_data...)
Each session like this is about 100 events long, comes in quickly (every 1-2 seconds), all events share the same session_key
, and I'm transforming the session into a series of 20 or so events that go into the output topic. To build these events, I need to know about the entire session, so I need to wait for the end_event
to arrive to be able to run the processing and push output events to the output topic.
The implementation is fairly simple - key by session_key
, store start_event
into ValueState
, entries into ListState
, and then when end_event
arrives, run the processing logic over all events and push the results into the output Kafka topic.
My question is around checkpointing and possible failures - let's say checkpointing starts after end_event
makes it out of Kafka. The offset is committed to Kafka and the checkpointing barrier gets to my processing operator which fails right before it (Kafka is down now).
How should I correctly recover from this? If the Kafka offset is already committed, and no end_event
will ever make it out of Kafka for that session_key
, how do I trigger the processing operator for my saved state later on? Or would the Kafka offset not be committed in this scenario and end_event
will go thru Flink once more?
Flink commits Kafka offsets only for the sake of monitoring, but does not rely on them for fault tolerance (btw, it does it when checkpoints are completed):
Note that Kafka source does NOT rely on committed offsets for fault tolerance. Committing offset is only for exposing the progress of consumer and consuming group for monitoring.
See Consumer Offset Committing.
Topic offsets are saved as a part of the Kafka source state during a checkpoint. In the described scenario, the whole checkpoint would fail and Flink would start consuming topics starting from the offsets saved in the previous checkpoint. No messages would be lost, but some could be possibly duplicated (assuming AT_LEAST_ONCE checkpointing mode).
So yes, the end_event will go thru Flink once more.