I have a stream of events and want to count the number of events in a specific period time to find out event loss. My code is something similar to the following:
DataStream<DataEvent> dataStream = ...;
dataStream
.windowAll(SlidingEventTimeWindows.of(Time.seconds(windowSize),Time.seconds(1)))
.process(new MyProcessWindowFunction());
and I defined the MyProcessWindowFunction class as:
public class MyProcessWindowFunction
extends ProcessAllWindowFunction<DataEvent, String, TimeWindow> {
@Override
public void process(ProcessAllWindowFunction<DataEvent, String, TimeWindow>.Context context, Iterable<DataEvent> iterable, Collector<String> collector) throws Exception {
long count = 0;
for (DataEvent dataEvent : iterable) {
count++;
}
if ()
collector.collect("Window: " + context.window() + "count: " + count);
}
}
My question is how can I use the counted value to compare it and find the event loss. As I understand correctly, this process function will create a stream of strings which is collected by the collector. But, I want I want to do something as soon as I found the event loss at the end of each sliding window.
I appreciate any help. Best regards,
It sounds like you want to do something along these lines:
datastream
.map(new Tuple2<>(event.sensorId, 1))
.keyBy(t -> t.f0)
.window(SlidingEventTimeWindows.of(Time.seconds(windowSize),Time.seconds(1)))
.reduce(t1, t2 -> new Tuple2<>(t1.f0, t1.f1 + t2.f1))
.filter(t -> new SensorShouldBeStopped(t))
.addSink(...);