apache-flinkflink-streaming

Custom Window Function in Flink


I have a streaming use-case where I would like to implement a custom window function in Flink where the window start happens based on an event containing a certain value. Then, the window will progress for 15 seconds, collect all events that come in during the 15 seconds and close the window. I would like to aggregate all window events after. I've checked the Flink window functions and none seem to solve my use-case.

outputStream = ewSubStream.keyBy(new KeySelector())
    .window(new SessionWindowAssigner(15000))
    .trigger(new SessionWindowTrigger())
    .aggregate(new SessionModelAggregator())
    .map(new SessionModelEvaluator());

I tried it the above way where each of the window, trigger and aggregate functions are custom. Doesn't seem to work. I also tried this way:

outputStream = ewSubStream.keyBy(new KeySelector())
    .window(TumblingEventTimeWindows.of(Time.seconds(15))) // tried with .window(EventTimeSessionWindows.withGap(Time.seconds(15))) as well
    .trigger(new SessionWindowTrigger())
    .aggregate(new SessionModelAggregator())
    .map(new SessionModelEvaluator());

There is no exception I see but no logs are printed that I have in the trigger function. My trigger function looks like this:

@Override
public TriggerResult onElement(Element element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
  Map<String, Object> data = element.getData();
  log.info("elementtrigger: {}", element);
  if (data.containsKey("key") &&
      data.get("my_record_key").toString().equals("active")) {
    log.info("firing: {}", element);
    return TriggerResult.FIRE;
  }
  return TriggerResult.CONTINUE;
}

Basically, I would like to trigger / start the window when the my_record_key=active and run the windowing for 15 seconds and stop this window session.

When I run this application, it doesn't trigger my window function nor does it run the trigger function.

Any ideas/thoughts are helpful!


Solution

  • I would just use a KeyedProcessFunction, and register a timer and create state once the KeyedProcessFunction.processElement() method is called with the "special event" that triggers the start of data collection. Your state would be whatever you need to aggregate results.

    When KeyedProcessFunction.processElement() is called, and you have state (which means you've seen the special event), then you do your aggregation.

    When KeyedProcessFunction.onTimer() is called, you output the result of the aggregation, and clear your state.