I am encountering an error in my Flink app where calling myValueState.value()
, inside a KeyedProcessFunction
, sometimes returns null
despite the fact that the logic in the code should guarantee that the object returned by .value()
is not null. These nulls are returned rarely, and do not occur again when the app is restarted and run on the same data that it previously failed on. Note: myValueState
is of type ValueState<java.time.LocalDateTime>
.
More Context
Code
updateMinTimestamp
minTimestamp.value()
in the getLocalDateTimeValueState
function, which will return null once in a while
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class MyClass extends KeyedProcessFunction<String, Tuple2<String, byte[]>, Tuple2<String,String>> {
private transient ObjectMapper objectMapper;
private transient ValueState<LocalDateTime> minTimestamp;
@Override
public void processElement(final Tuple2<String, byte[]> input, final KeyedProcessFunction<String, Tuple2<String, byte[]>, Tuple2<String, String>>.Context ctx, final Collector<Tuple2<String, String>> out) throws Exception {
Event maybeDeserializedEvent = deserializeBytesToEvent(input.f1);
if (maybeDeserializedEvent instanceof SuccessfullyDeserializedEvent) {
SuccessfullyDeserializedEvent event = (SuccessfullyDeserializedEvent) maybeDeserializedEvent;
System.out.printf(
"Deserialized event category '%s' for txnId '%s' with timestamp '%s'\n",
event.getCategory(), event.getTxnId(), event.getTimestamp()
);
updateMinTimestamp(event.getTimestamp());
// some other stuff (processing + aggregating event, unrelated to the minTimestamp...
//....
// this value is sometimes null, which triggers a NPE when calling `toString` on it
// based on the logic of the updateMinTimestamp() method, `minTimestampValue` should never be null
LocalDateTime minTimestampValue = getLocalDateTimeValueState(minTimestamp);
// sometimes throws NPE
String minTimestampStr = minTimestampValue.toString();
// some more stuff, include ctx.out(...)
//....
}
}
@Override
public void open(Configuration configuration) {
objectMapper = new ObjectMapper();
minTimestamp = getRuntimeContext().getState(createEventTimestampDescriptor("min-timestamp", 2));
}
private ValueStateDescriptor<LocalDateTime> createEventTimestampDescriptor(String name, Integer ttl) {
ValueStateDescriptor<LocalDateTime> eventTimestampDescriptor = new ValueStateDescriptor<>(
name,
new LocalDateTimeSerializer()
);
eventTimestampDescriptor.enableTimeToLive(
StateTtlConfig
.newBuilder(Time.hours(ttl))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build()
);
return eventTimestampDescriptor;
}
private Event deserializeBytesToEvent(byte[] serializedEvent) {
SuccessfullyDeserializedEvent event = new SuccessfullyDeserializedEvent();
try {
final JsonNode node = objectMapper.readTree(serializedEvent);
event.setCategory(node.get("category").asLong());
event.setTxnId(node.get("txnId").asText());
event.setTimestamp(LocalDateTime.parse(node.get("timestamp").asText(), DateTimeFormatter.ISO_DATE_TIME));
event.setPayload(objectMapper.readTree(node.get("payload").asText()));
return event;
} catch (IOException e) {
System.out.printf(
"Failed to deserialize event with category:'%s', txnId:'%s', timestamp:'%s', payload:'%s'\n",
event.getCategory(),
event.getTxnId(),
event.getTimestamp(),
event.getPayload()
);
return new UnsuccessfullyDeserializedEvent();
}
}
void updateMinTimestamp(LocalDateTime newTimestamp) {
try {
final LocalDateTime currentMinTimestamp = minTimestamp.value();
if (currentMinTimestamp == null || newTimestamp.isBefore(currentMinTimestamp)) {
minTimestamp.update(newTimestamp);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private LocalDateTime getLocalDateTimeValueState(ValueState<LocalDateTime> localDateTimeValueState) {
try {
return localDateTimeValueState.value();
} catch (IOException e) {
throw new RuntimeException("Error grabbing localdatetime from value state");
}
}
public interface Event {}
public class SuccessfullyDeserializedEvent implements Event {
private Long category;
private JsonNode payload;
private String txnId;
private LocalDateTime timestamp;
SuccessfullyDeserializedEvent() {}
// getters
Long getCategory() {
return this.category;
}
JsonNode getPayload() {
return this.payload;
}
String getTxnId() {
return this.txnId;
}
LocalDateTime getTimestamp() {
return this.timestamp;
}
// setters
void setCategory(Long category) {
this.category = category;
}
void setPayload(JsonNode payload) {
this.payload = payload;
}
void setTxnId(String txnId) {
this.txnId = txnId;
}
void setTimestamp(LocalDateTime timestamp) {
this.timestamp = timestamp;
}
}
public class UnsuccessfullyDeserializedEvent implements Event {
}
}
Any information regarding why this error is occurring, and how to prevent it, is much appreciated
Answering my own question: I believe the error was caused by a combination of my bad coding practices + odd behavior due from Flink's state expiry TTL mechanism:
processElement
call; this is inefficient, all you need to do is grab state and update it; there should be no need to grab it again after it is updatedOnReadAndWrite
); there's been a time gap between when I dug into this and now, but I can safely say that Flink's TTL mechanism has a lot of unexpected behaviorIn general, a safe pattern for working with Flink's state is: grab all state near the start of your stateful process function, check it's values, and proceed accordingly (i.e. do x if all state is there, do y if some state is expired but other state is not, and so on). If your Flink state has some TTL, not being careful with it can potentially produce a race condition (will your code grab the state, or will it expire unexpectedly?).