I have a custom flink Source
, and I have a SerializableTimestampAssigner
that assigns event timestamps to records emitted by the source. The source may emit records out of order because of the nature of the underlying data storage, however with BATCH mode, I expect Flink to sort these records by event timestamp before any operator processes them.
Excerpted from Flink document on execution mode:
In BATCH mode, where the input dataset is known in advance, there is no need for such a heuristic as, at the very least, elements can be sorted by timestamp so that they are processed in temporal order.
However, this doesn't seem to be the case. If I create a datastream out of the Source
(StreamExecutionEnvironment.fromSource
) with my timestamp assigner, and then datastream.addSink(x => println(extractTimestamp(x))
, the output isn't strictly ascending. Is my understanding of the document wrong? Or does flink expect me (the users) to sort the input dataset themselves?
BATCH execution mode first sorts by key, and within each key, it sorts by timestamp. By operating this way, it only needs to keep state for one key at a time, so this keeps the runtime simple and efficient.
If your pipeline isn't using keyed streams, then you won't be using keyed state or timers, so the ordering shouldn't matter (and I'm not sure what happens).
For keyed co-streams, they are both keyed in the same way, and both streams are sorted by those keys and the keys are advanced in lockstep.
Broadcast streams are sent in their entirety before anything else.