Could really appreciate some help on this.
I want to implement a use-case where a KeyedProcessFunction sends a "done" signal when it's numRecordsInPerSecond is equal to 0. The problem is I cannot seem to access this metric out-of-the box. Here's a code snippet of roughly what I'm trying to achieve:
public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
private final OutputTag<String> outputTag;
@Override
public void open(Configuration parameters) throws Exception {}
public MyKeyedProcessFunction(OutputTag<String> outputTag) {
this.outputTag = outputTag;
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
// Emit to side output if numRecordsInPerSecond is 0
double numRecordsInPerSecond = getRuntimeContext().getMetricGroup().getIOMetricGroup().getNumRecordsInPerSecond();
if (numRecordsInPerSecond == 0) {
ctx.output(outputTag, "done");
}
}
}
Is it even possible for my flink program to access numRecordsInPerSecond for each operator? I know this metric is available in the WEBUI and can be made available externally but what about internally accessing it in the same Flink job?
Flink Version: 1.17.2
As @kkrugler pointed out, accessing metrics from within the job itself isn't straightforward -- there's no API for that.
But from within a KeyedProcessFunction
you have everything you need to calculate the number of events being processed each second. You can arrange for a processing time timer to fire every second, and maintain the counter yourself.