apache-flink

KeyedProcessFunction throws NPE when using System.currentTimeMillis() in keyBy


I'm working on a Flink job that processes Row elements and applies a delay using a KeyedProcessFunction. The issue arises with how the key is generated in the keyBy function.

When I use System.currentTimeMillis() to generate the key in keyBy, I get a NullPointerException when adding elements to ListState in bufferedElements.add(). Specifically, the setCurrentKeyGroupIndex function calculates a key group index of 78, which is out of the assigned key group range (52-76) for the task, causing the NPE.

However, when I use a field from the Row (which also contains a System.currentTimeMillis() value) as the key, the problem doesn't occur, and the state management works fine.

Here’s a simplified version of my code:

public static void main(String[] args) {
    // Define row types
    RowTypeInfo rowTypeInfo = new RowTypeInfo(
        new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG, Types.LONG}, 
        new String[]{"src_ip", "port", "time", "time2"});

    // Set up the environment
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setParallelism(5);

    // Source function
    environment.addSource(new SourceFunction<Row>() {
        @Override
        public void run(SourceContext<Row> ctx) throws Exception {
            for (int i = 0; i < 1000; i++) {
                ctx.collect(Row.of("first_" + 1, "456", System.currentTimeMillis(), System.currentTimeMillis()));
                Thread.sleep(50);
            }
        }

        @Override
        public void cancel() {
        }
    }).returns(rowTypeInfo)
      .keyBy(new KeySelector<Row, String>() {
          @Override
          public String getKey(Row value) throws Exception {
              // Causes NPE
              return String.valueOf(System.currentTimeMillis());
              
              // Works fine
              // return String.valueOf(value.getField(3));
          }
      })
      .process(new DelayFunction(5000))
      .map(new MapFunction<Row, Row>() {
          @Override
          public Row map(Row value) throws Exception {
              return value;
          }
      });

    try {
        environment.execute();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

// Delay function
public static class DelayFunction extends KeyedProcessFunction<String, Row, Row> {
    private final long delayTime;
    private transient ListState<Row> bufferedElements;

    public DelayFunction(long delayTime) {
        this.delayTime = delayTime;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor<Row> descriptor = new ListStateDescriptor<>("bufferedElements", Row.class);
        bufferedElements = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception {
        long currentTime = ctx.timerService().currentProcessingTime();
        long triggerTime = currentTime + delayTime;
        value.setField(0, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(triggerTime));
        bufferedElements.add(value); // NPE occurs here
        ctx.timerService().registerProcessingTimeTimer(triggerTime);
    }
}

The keyBy function using System.currentTimeMillis() causes the setCurrentKeyGroupIndex to compute a key group of 78, but my task is only responsible for key groups 52-76, which seems to cause the NullPointerException when adding to bufferedElements.

Why does this issue occur with System.currentTimeMillis(), but not when I use a field from the Row (which also contains a System.currentTimeMillis() value)?

Any insights would be greatly appreciated!

enter image description here

enter image description here


Solution

  • With Flink, the keys are not communicated/shared, but are instead computed whenever needed by calling the KeySelector function (which is known through the cluster). Flink requires that the function that computes the keys be deterministic, meaning that all parallel instances of the job must compute the same value for every key, regardless of when they compute its value. Without this guarantee, Flink will fail, possibly in unpredictable ways.

    This means that the key cannot depend on something random or something that changes over time, nor can it be something whose value varies from one JVM to another, like an enum.

    As for why value.getField(3) works fine, while System.currentTimeMillis() does not, it's because value.getField(3) returns the same thing every time it gets called.