I am encountering an error in my Flink app where calling myValueState.value()
, inside a KeyedProcessFunction
, sometimes returns null
despite the fact that the logic in the code should guarantee that the object returned by .value()
is not null. These nulls are returned rarely, and do not occur again when the app is restarted and run on the same data that it previously failed on. Note: myValueState
is of type ValueState<java.time.LocalDateTime>
.
More Context
- I am using Flink 1.15.2, hosted on AWS Kinesis Data Analytics; this is where the error occurs
- The error does not occur locally
- RocksDB is used as the state store backend on AWS Kinesis Data Analytics
- I am using the DataStream API with java 11
Code
- Near the top of my process function, I run
updateMinTimestamp
- This function, to my mind, should ensure that the value of this value state should never be null
- Later on in the code I call
minTimestamp.value()
in thegetLocalDateTimeValueState
function, which will return null once in a while
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class MyClass extends KeyedProcessFunction<String, Tuple2<String, byte[]>, Tuple2<String,String>> {
private transient ObjectMapper objectMapper;
private transient ValueState<LocalDateTime> minTimestamp;
@Override
public void processElement(final Tuple2<String, byte[]> input, final KeyedProcessFunction<String, Tuple2<String, byte[]>, Tuple2<String, String>>.Context ctx, final Collector<Tuple2<String, String>> out) throws Exception {
Event maybeDeserializedEvent = deserializeBytesToEvent(input.f1);
if (maybeDeserializedEvent instanceof SuccessfullyDeserializedEvent) {
SuccessfullyDeserializedEvent event = (SuccessfullyDeserializedEvent) maybeDeserializedEvent;
System.out.printf(
"Deserialized event category '%s' for txnId '%s' with timestamp '%s'\n",
event.getCategory(), event.getTxnId(), event.getTimestamp()
);
updateMinTimestamp(event.getTimestamp());
// some other stuff (processing + aggregating event, unrelated to the minTimestamp...
//....
// this value is sometimes null, which triggers a NPE when calling `toString` on it
// based on the logic of the updateMinTimestamp() method, `minTimestampValue` should never be null
LocalDateTime minTimestampValue = getLocalDateTimeValueState(minTimestamp);
// sometimes throws NPE
String minTimestampStr = minTimestampValue.toString();
// some more stuff, include ctx.out(...)
//....
}
}
@Override
public void open(Configuration configuration) {
objectMapper = new ObjectMapper();
minTimestamp = getRuntimeContext().getState(createEventTimestampDescriptor("min-timestamp", 2));
}
private ValueStateDescriptor<LocalDateTime> createEventTimestampDescriptor(String name, Integer ttl) {
ValueStateDescriptor<LocalDateTime> eventTimestampDescriptor = new ValueStateDescriptor<>(
name,
new LocalDateTimeSerializer()
);
eventTimestampDescriptor.enableTimeToLive(
StateTtlConfig
.newBuilder(Time.hours(ttl))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build()
);
return eventTimestampDescriptor;
}
private Event deserializeBytesToEvent(byte[] serializedEvent) {
SuccessfullyDeserializedEvent event = new SuccessfullyDeserializedEvent();
try {
final JsonNode node = objectMapper.readTree(serializedEvent);
event.setCategory(node.get("category").asLong());
event.setTxnId(node.get("txnId").asText());
event.setTimestamp(LocalDateTime.parse(node.get("timestamp").asText(), DateTimeFormatter.ISO_DATE_TIME));
event.setPayload(objectMapper.readTree(node.get("payload").asText()));
return event;
} catch (IOException e) {
System.out.printf(
"Failed to deserialize event with category:'%s', txnId:'%s', timestamp:'%s', payload:'%s'\n",
event.getCategory(),
event.getTxnId(),
event.getTimestamp(),
event.getPayload()
);
return new UnsuccessfullyDeserializedEvent();
}
}
void updateMinTimestamp(LocalDateTime newTimestamp) {
try {
final LocalDateTime currentMinTimestamp = minTimestamp.value();
if (currentMinTimestamp == null || newTimestamp.isBefore(currentMinTimestamp)) {
minTimestamp.update(newTimestamp);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private LocalDateTime getLocalDateTimeValueState(ValueState<LocalDateTime> localDateTimeValueState) {
try {
return localDateTimeValueState.value();
} catch (IOException e) {
throw new RuntimeException("Error grabbing localdatetime from value state");
}
}
public interface Event {}
public class SuccessfullyDeserializedEvent implements Event {
private Long category;
private JsonNode payload;
private String txnId;
private LocalDateTime timestamp;
SuccessfullyDeserializedEvent() {}
// getters
Long getCategory() {
return this.category;
}
JsonNode getPayload() {
return this.payload;
}
String getTxnId() {
return this.txnId;
}
LocalDateTime getTimestamp() {
return this.timestamp;
}
// setters
void setCategory(Long category) {
this.category = category;
}
void setPayload(JsonNode payload) {
this.payload = payload;
}
void setTxnId(String txnId) {
this.txnId = txnId;
}
void setTimestamp(LocalDateTime timestamp) {
this.timestamp = timestamp;
}
}
public class UnsuccessfullyDeserializedEvent implements Event {
}
}
Any information regarding why this error is occurring, and how to prevent it, is much appreciated