I want to find the event time difference between every two consecutive input events. If the time difference is above a certain threshold then I want to output an event signalling the threshold has been breached. I also want the first event of the stream to always output this breach signal as an indication that it does not have a previous event to calculate a time difference with.
I tried using Flink's CEP library as it ensures that the events are ordered by event time.
The pattern I created is as follows:
Pattern.begin("begin").optional().next("end");
I use the optional()
clause to cater for the first event as I figured the first event would be the only event where "begin"
would not have a value.
When I input my events a1 a2 a3 a4 a5
I get the following output matches:
{a1} {a1 a2} {a2} {a2 a3} {a3} {a3 a4} {a4} {a4 a5}...
However I want the following as it will allow me to calculate the time difference between each consecutive event.
{a1} {a1 a2} {a2 a3} {a3 a4} {a4 a5}...
I have tried playing around with different AfterMatchSkipStrategy
settings as well as IterativeCondition
clauses but with no success.
Marking "begin" as optional is what's causing the unwanted matches. I would look for some other way to generate the breach signal for the first event -- e.g., perhaps you could prepend a dummy first event.
Another approach would be to only use CEP or SQL for sorting the stream, and then use a RichFlatMap or stateful process function to implement the business logic: i.e., compute the differences and generate the breach signals.
See Can I use Flink CEP to sort a stream? for how to do this.