I am currently using Apache Flink 1.13.2 with Java for my streaming application. I am using a keyed function with no window function. I have implemented a watermark strategy and autoWatermarkInterval
config per the documentation, although my watermark is not advancing.
I have double-checked this by using the Flink web UI and printing the current watermark in my EventProcessor
KeyedProcessFunction
but the watermark is constantly set to a very large negative number -9223372036854775808
(lowest possible watermark).
env.getConfig().setAutoWatermarkInterval(1000);
WatermarkStrategy<EventPayload> watermarkStrategy = WatermarkStrategy
.<EventPayload>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
DataStream<EventPayload> deserialized = input
.assignTimestampsAndWatermarks(watermarkStrategy)
.flatMap(new Deserializer());
DataStream<EnrichedEventPayload> resultStream =
AsyncDataStream.orderedWait(deserialized, new Enrichment(), 5, TimeUnit.SECONDS, 100);
DataStream<Session> eventsStream = resultStream
.filter(EnrichedEventPayload::getIsEnriched)
.keyBy(EnrichedEventPayload::getId)
.process(new EventProcessor());
I even tried to add the WatermarkStrategy
to the stream where it is using keyBy
(and adjusting the types to match) but still no luck.
DataStream<Session> eventsStream = resultStream
.filter(EnrichedEventPayload::getIsEnriched)
.keyBy(EnrichedEventPayload::getId)
.assignTimestampsAndWatermarks(watermarkStrategy)
.process(new EventProcessor());
I have also tried using my own class implementing WatermarkStrategy
and set breakpoints on the onEvent
function to ensure the new watermark was being emitted, although it still did not advance (and any associated timers did not fire).
Any help would be greatly appreciated!
This will happen if one of the parallel instances of the watermark strategy is idle (i.e., if there are no events flowing through it). Using the withIdleness(...)
option on the watermark strategy would be one way to solve this.