apache-flinkflink-streamingcomplex-event-processingflink-cep

How do I find the event time difference between consecutive events in Flink?


I want to find the event time difference between every two consecutive input events. If the time difference is above a certain threshold then I want to output an event signalling the threshold has been breached. I also want the first event of the stream to always output this breach signal as an indication that it does not have a previous event to calculate a time difference with.

I tried using Flink's CEP library as it ensures that the events are ordered by event time.

The pattern I created is as follows:

Pattern.begin("begin").optional().next("end");

I use the optional() clause to cater for the first event as I figured the first event would be the only event where "begin" would not have a value.

When I input my events a1 a2 a3 a4 a5 I get the following output matches:

{a1} {a1 a2} {a2} {a2 a3} {a3} {a3 a4} {a4} {a4 a5}...

However I want the following as it will allow me to calculate the time difference between each consecutive event.

{a1} {a1 a2} {a2 a3} {a3 a4} {a4 a5}...

I have tried playing around with different AfterMatchSkipStrategy settings as well as IterativeCondition clauses but with no success.


Solution

  • Marking "begin" as optional is what's causing the unwanted matches. I would look for some other way to generate the breach signal for the first event -- e.g., perhaps you could prepend a dummy first event.

    Another approach would be to only use CEP or SQL for sorting the stream, and then use a RichFlatMap or stateful process function to implement the business logic: i.e., compute the differences and generate the breach signals.

    See Can I use Flink CEP to sort a stream? for how to do this.