We have our Flink application(version 1.13.2) deployed on AWS KDA. The strategy is that we do not want the application to stop at all, so we always recover the application from a snapshot when updating the jar with new changes.
Recently, we found a problem where a lower-level POJO class is corrupted. It contains a few getters and setters with wrong namings. This early mistake essentially hinders us from adding the POJO class with new fields. So we decided to rename the getter/setter directly. But it led us to the following exception after updating the application.
org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.common.typeutils.base.ListSerializer@46c65a77) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.ListSerializer@30c9146c).
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.getOrCreateKeyedState(StreamOperatorStateHandler.java:315) at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:494) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:243) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
As far as we understand, the failure happens specifically in the 2 CoGroup functions we implemented. They are both consuming the corrupted POJO class nested in another POJO class, Session. A code snippet of the Cogroup function is shown below. BTW, we are using google guava list here, not sure if it causes list serializer problem.
public class OutputCoGroup extends CoGroupFunction<Session, Event, OutputSession> {
@Override
public void coGroup(Iterable<Session> sessions, Iterable<Event> events,
Collector<OutputSession> collector) throws Exception {
// we are using google guava list here, not sure if it causes list serializer problem
if (Lists.newArrayList(sessions).size() > 0) {
...
if (events.iterator().hasNext()) {
List<Event> eventList = Lists.newArrayList(events);
...
As we can see in the input, the session is the POJO class that contains the problematic POJO class.
public class Session{
private problematicPOJO problematicpojo;
...
}
The problematic POJO class has 2 Boolean fields with the wrong getter/setter namings(literally missing Is :ยด<). Other fields in the class are ignored, they do not have any issues.
public class problematicPojo {
private Boolean isA;
private Boolean isB;
...
public getA(){ ... }
public setA(...){ ... }
public getB(){ ... }
public setB(...){ ... }
...
}
We have looked up some possible solutions.
- Using State Processor API -> AWS does not provide access to KDA snapshots, so we're not able to modify it
- Providing TypeInformation to the problematic POJO class -> did not seem to be working
We are thinking of specifying listStateDescriptor in the cogroup function(changing to RichCoGroup) to be able to manually update the states when recovering from a snapshot. But we could not get too much insight from the official docs. Is anyone here familiar with this method and can help us out?
Thank you!