2

We have our Flink application(version 1.13.2) deployed on AWS KDA. The strategy is that we do not want the application to stop at all, so we always recover the application from a snapshot when updating the jar with new changes.

Recently, we found a problem where a lower-level POJO class is corrupted. It contains a few getters and setters with wrong namings. This early mistake essentially hinders us from adding the POJO class with new fields. So we decided to rename the getter/setter directly. But it led us to the following exception after updating the application.

org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.common.typeutils.base.ListSerializer@46c65a77) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.ListSerializer@30c9146c).
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.getOrCreateKeyedState(StreamOperatorStateHandler.java:315) at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:494) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:243) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)

As far as we understand, the failure happens specifically in the 2 CoGroup functions we implemented. They are both consuming the corrupted POJO class nested in another POJO class, Session. A code snippet of the Cogroup function is shown below. BTW, we are using google guava list here, not sure if it causes list serializer problem.

public class OutputCoGroup extends CoGroupFunction<Session, Event, OutputSession> {

    @Override
    public void coGroup(Iterable<Session> sessions, Iterable<Event> events,
            Collector<OutputSession> collector) throws Exception {

        // we are using google guava list here, not sure if it causes list serializer problem
        if (Lists.newArrayList(sessions).size() > 0) {
            ...

            if (events.iterator().hasNext()) {

                List<Event> eventList = Lists.newArrayList(events);
                ...

As we can see in the input, the session is the POJO class that contains the problematic POJO class.

public class Session{
   
    private problematicPOJO problematicpojo;
    ...
}

The problematic POJO class has 2 Boolean fields with the wrong getter/setter namings(literally missing Is :ยด<). Other fields in the class are ignored, they do not have any issues.

public class problematicPojo {
    
    private Boolean isA;
    private Boolean isB;
    ...

    public getA(){ ... }
    public setA(...){ ... }
    public getB(){ ... }
    public setB(...){ ... }
    ...

}

We have looked up some possible solutions.

  1. Using State Processor API -> AWS does not provide access to KDA snapshots, so we're not able to modify it
  2. Providing TypeInformation to the problematic POJO class -> did not seem to be working

We are thinking of specifying listStateDescriptor in the cogroup function(changing to RichCoGroup) to be able to manually update the states when recovering from a snapshot. But we could not get too much insight from the official docs. Is anyone here familiar with this method and can help us out?

Thank you!

Charles fu
  • 21
  • 2

0 Answers0