I'm working on a Flink job that processes Row
elements and applies a delay using a KeyedProcessFunction
. The issue arises with how the key is generated in the keyBy
function.
When I use System.currentTimeMillis()
to generate the key in keyBy
, I get a NullPointerException
when adding elements to ListState
in bufferedElements.add()
. Specifically, the setCurrentKeyGroupIndex
function calculates a key group index of 78, which is out of the assigned key group range (52-76) for the task, causing the NPE.
However, when I use a field from the Row
(which also contains a System.currentTimeMillis()
value) as the key, the problem doesn't occur, and the state management works fine.
Here’s a simplified version of my code:
public static void main(String[] args) {
// Define row types
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG, Types.LONG},
new String[]{"src_ip", "port", "time", "time2"});
// Set up the environment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(5);
// Source function
environment.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 1000; i++) {
ctx.collect(Row.of("first_" + 1, "456", System.currentTimeMillis(), System.currentTimeMillis()));
Thread.sleep(50);
}
}
@Override
public void cancel() {
}
}).returns(rowTypeInfo)
.keyBy(new KeySelector<Row, String>() {
@Override
public String getKey(Row value) throws Exception {
// Causes NPE
return String.valueOf(System.currentTimeMillis());
// Works fine
// return String.valueOf(value.getField(3));
}
})
.process(new DelayFunction(5000))
.map(new MapFunction<Row, Row>() {
@Override
public Row map(Row value) throws Exception {
return value;
}
});
try {
environment.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// Delay function
public static class DelayFunction extends KeyedProcessFunction<String, Row, Row> {
private final long delayTime;
private transient ListState<Row> bufferedElements;
public DelayFunction(long delayTime) {
this.delayTime = delayTime;
}
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<Row> descriptor = new ListStateDescriptor<>("bufferedElements", Row.class);
bufferedElements = getRuntimeContext().getListState(descriptor);
}
@Override
public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception {
long currentTime = ctx.timerService().currentProcessingTime();
long triggerTime = currentTime + delayTime;
value.setField(0, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(triggerTime));
bufferedElements.add(value); // NPE occurs here
ctx.timerService().registerProcessingTimeTimer(triggerTime);
}
}
The keyBy
function using System.currentTimeMillis()
causes the setCurrentKeyGroupIndex
to compute a key group of 78, but my task is only responsible for key groups 52-76, which seems to cause the NullPointerException
when adding to bufferedElements
.
Why does this issue occur with System.currentTimeMillis()
, but not when I use a field from the Row
(which also contains a System.currentTimeMillis()
value)?
Any insights would be greatly appreciated!
With Flink, the keys are not communicated/shared, but are instead computed whenever needed by calling the KeySelector function (which is known through the cluster). Flink requires that the function that computes the keys be deterministic, meaning that all parallel instances of the job must compute the same value for every key, regardless of when they compute its value. Without this guarantee, Flink will fail, possibly in unpredictable ways.
This means that the key cannot depend on something random or something that changes over time, nor can it be something whose value varies from one JVM to another, like an enum.
As for why value.getField(3)
works fine, while System.currentTimeMillis()
does not, it's because value.getField(3)
returns the same thing every time it gets called.