apache-flinkflink-streamingamazon-kinesis-analyticsflink-state

Why does Flink ValueState.value() sometimes erroneously return null?


I am encountering an error in my Flink app where calling myValueState.value(), inside a KeyedProcessFunction, sometimes returns null despite the fact that the logic in the code should guarantee that the object returned by .value() is not null. These nulls are returned rarely, and do not occur again when the app is restarted and run on the same data that it previously failed on. Note: myValueState is of type ValueState<java.time.LocalDateTime>.

More Context

Code


    import java.io.IOException;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    
    public class MyClass extends KeyedProcessFunction<String, Tuple2<String, byte[]>, Tuple2<String,String>> {
        private transient ObjectMapper objectMapper;
        private transient ValueState<LocalDateTime> minTimestamp;
    
        @Override
        public void processElement(final Tuple2<String, byte[]> input, final KeyedProcessFunction<String, Tuple2<String, byte[]>, Tuple2<String, String>>.Context ctx, final Collector<Tuple2<String, String>> out) throws Exception {
            Event maybeDeserializedEvent = deserializeBytesToEvent(input.f1);
    
            if (maybeDeserializedEvent instanceof SuccessfullyDeserializedEvent) {
                SuccessfullyDeserializedEvent event = (SuccessfullyDeserializedEvent) maybeDeserializedEvent;
                System.out.printf(
                        "Deserialized event category '%s' for txnId '%s' with timestamp '%s'\n",
                        event.getCategory(), event.getTxnId(), event.getTimestamp()
                );
    
                updateMinTimestamp(event.getTimestamp());
    
                // some other stuff (processing + aggregating event, unrelated to the minTimestamp...
                //....
    
                // this value is sometimes null, which triggers a NPE when calling `toString` on it
                // based on the logic of the updateMinTimestamp() method, `minTimestampValue` should never be null
                LocalDateTime minTimestampValue = getLocalDateTimeValueState(minTimestamp);
    
                // sometimes throws NPE
                String minTimestampStr = minTimestampValue.toString();
    
                // some more stuff, include ctx.out(...)
                //....
            }
        }
    
        @Override
        public void open(Configuration configuration) {
            objectMapper = new ObjectMapper();
            minTimestamp = getRuntimeContext().getState(createEventTimestampDescriptor("min-timestamp", 2));
        }
    
        private ValueStateDescriptor<LocalDateTime> createEventTimestampDescriptor(String name, Integer ttl) {
            ValueStateDescriptor<LocalDateTime> eventTimestampDescriptor = new ValueStateDescriptor<>(
                    name,
                    new LocalDateTimeSerializer()
            );
            eventTimestampDescriptor.enableTimeToLive(
                    StateTtlConfig
                            .newBuilder(Time.hours(ttl))
                            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                            .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
                            .build()
            );
            return eventTimestampDescriptor;
        }
    
        private Event deserializeBytesToEvent(byte[] serializedEvent) {
            SuccessfullyDeserializedEvent event = new SuccessfullyDeserializedEvent();
            try {
                final JsonNode node = objectMapper.readTree(serializedEvent);
                event.setCategory(node.get("category").asLong());
                event.setTxnId(node.get("txnId").asText());
                event.setTimestamp(LocalDateTime.parse(node.get("timestamp").asText(), DateTimeFormatter.ISO_DATE_TIME));
                event.setPayload(objectMapper.readTree(node.get("payload").asText()));
    
                return event;
            } catch (IOException e) {
                System.out.printf(
                        "Failed to deserialize event with category:'%s', txnId:'%s', timestamp:'%s', payload:'%s'\n",
                        event.getCategory(),
                        event.getTxnId(),
                        event.getTimestamp(),
                        event.getPayload()
                );
                return new UnsuccessfullyDeserializedEvent();
            }
        }
    
        void updateMinTimestamp(LocalDateTime newTimestamp) {
            try {
                final LocalDateTime currentMinTimestamp = minTimestamp.value();
                if (currentMinTimestamp == null || newTimestamp.isBefore(currentMinTimestamp)) {
                    minTimestamp.update(newTimestamp);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        private LocalDateTime getLocalDateTimeValueState(ValueState<LocalDateTime> localDateTimeValueState) {
            try {
                return localDateTimeValueState.value();
            } catch (IOException e) {
                throw new RuntimeException("Error grabbing localdatetime from value state");
            }
        }
    
        public interface Event {}
    
    
        public class SuccessfullyDeserializedEvent implements Event {
            private Long category;
            private JsonNode payload;
            private String txnId;
            private LocalDateTime timestamp;
    
            SuccessfullyDeserializedEvent() {}
    
            // getters
            Long getCategory() {
                return this.category;
            }
            JsonNode getPayload() {
                return this.payload;
            }
            String getTxnId() {
                return this.txnId;
            }
            LocalDateTime getTimestamp() {
                return this.timestamp;
            }
            // setters
            void setCategory(Long category) {
                this.category = category;
            }
            void setPayload(JsonNode payload) {
                this.payload = payload;
            }
            void setTxnId(String txnId) {
                this.txnId = txnId;
            }
            void setTimestamp(LocalDateTime timestamp) {
                this.timestamp = timestamp;
            }
        }
    
        public class UnsuccessfullyDeserializedEvent implements Event {
        }
    }


Any information regarding why this error is occurring, and how to prevent it, is much appreciated


Solution

  • Answering my own question: I believe the error was caused by a combination of my bad coding practices + odd behavior due from Flink's state expiry TTL mechanism:

    1. In my code I was: grabbing state, updating state, and then grabbing the updated state within the same processElement call; this is inefficient, all you need to do is grab state and update it; there should be no need to grab it again after it is updated
    2. The behavior of Flink's TTL seems to be that, if state is in the process of being wiped out, updating it and then retrieving it afterwards may still give you a null (even if update type is OnReadAndWrite); there's been a time gap between when I dug into this and now, but I can safely say that Flink's TTL mechanism has a lot of unexpected behavior

    In general, a safe pattern for working with Flink's state is: grab all state near the start of your stateful process function, check it's values, and proceed accordingly (i.e. do x if all state is there, do y if some state is expired but other state is not, and so on). If your Flink state has some TTL, not being careful with it can potentially produce a race condition (will your code grab the state, or will it expire unexpectedly?).