3

I tried to deploy my Flink application into AWS Kinesis Data Analytics. This application use Apache Avro for deserialization/serialization incoming messages. My application works fine on my local machine, but when I deploy it to AWS, I have exception (in CloudWatch Logs): Caused by: java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase; local class incompatible: stream classdesc serialVersionUID = 4445917349737100331, local class serialVersionUID = -1463700717714793795

Log details:

{
  "locationInformation": "org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:913)",
  "logger": "org.apache.flink.runtime.taskmanager.Task",
  "message": "Source: Custom Source -> Sink: Unnamed (1/1) (a72ff69f9dc0f9e56d1104ce21456a5d) switched from RUNNING to FAILED.",
  "throwableInformation": [
    "org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate serializer.",
    "\tat org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:160)",
    "\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:380)",
    "\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)",
    "\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:133)",
    "\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:275)",
    "\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)",
    "\tat java.lang.Thread.run(Thread.java:748)",
    "Caused by: java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase; local class incompatible: stream classdesc serialVersionUID = 4445917349737100331, local class serialVersionUID = -1463700717714793795",
    "\tat java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)",
    "\tat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)",
    "\tat java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)",
    "\tat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)",
    "\tat java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)",
    "\tat java.io.ObjectInputStream.readClass(ObjectInputStream.java:1716)",
    "\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1556)",
    "\tat java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)",
    "\tat org.apache.flink.formats.avro.typeutils.AvroSerializer.readCurrentLayout(AvroSerializer.java:465)",
    "\tat org.apache.flink.formats.avro.typeutils.AvroSerializer.readObject(AvroSerializer.java:432)",
    "\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
    "\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)",
    "\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
    "\tat java.lang.reflect.Method.invoke(Method.java:498)",
    "\tat java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)",
    "\tat java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)",
    "\tat java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)",
    "\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)",
    "\tat java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)",
    "\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)",
    "\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)",
    "\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)",
    "\tat org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)",
    "\tat org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:158)",
    "\t... 6 more"
  ],
  "threadName": "Source: Custom Source -> Sink: Unnamed (1/1)",
  "applicationARN": "arn:aws:kinesisanalytics:us-east-1:829044228870:application/poc-kda",
  "applicationVersionId": "8",
  "messageSchemaVersion": "1",
  "messageType": "INFO"
}

I use libraries versions:

  • Apache Avro - 1.9.1
  • Apache Flink - 1.9.1
  • Kinesis producer library - 0.13.1
  • AWS Flink - 1.8

Note, same issue if I use Apache Flink - 1.8, 1.6

KDA Flink code:

public class KinesisExampleKDA {
   private static final String REGION = "us-east-1";

   public static void main(String[] args) throws Exception {
       Properties consumerConfig = new Properties();
       consumerConfig.put(AWSConfigConstants.AWS_REGION, REGION);
       consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.enableCheckpointing(50000);

       DataStream<EventAttributes> consumerStream = env.addSource(new FlinkKinesisConsumer<>(
               "dev-events", new KinesisSerializer(), consumerConfig));

       consumerStream
               .addSink(getProducer());
       env.execute("kinesis-example");
   }

   private static FlinkKinesisProducer<EventAttributes> getProducer(){
       Properties outputProperties = new Properties();
       outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, REGION);
       outputProperties.setProperty("AggregationEnabled", "false");

       FlinkKinesisProducer<EventAttributes> sink = new FlinkKinesisProducer<>(new KinesisSerializer(), outputProperties);
       sink.setDefaultStream("dev-result");
       sink.setDefaultPartition("0");
       return sink;
   }
}

class KinesisSerializer implements DeserializationSchema<EventAttributes>, SerializationSchema<EventAttributes> {
   @Override
   public EventAttributes deserialize(byte[] bytes) throws IOException {
       return EventAttributes.fromByteBuffer(ByteBuffer.wrap(bytes));
   }

   @Override
   public boolean isEndOfStream(EventAttributes eventAttributes) {
       return false;
   }

   @Override
   public byte[] serialize(EventAttributes eventAttributes) {
       try {
           return eventAttributes.toByteBuffer().array();
       } catch (IOException e) {
           e.printStackTrace();
       }
       return new byte[1];
   }

   @Override
   public TypeInformation<EventAttributes> getProducedType() {
       return TypeInformation.of(EventAttributes.class);
   }
}

Kinesis producer code:

public class KinesisProducer {

   private static String streamName = "dev-events";

   public static void main(String[] args) throws InterruptedException, JsonMappingException {

       AmazonKinesis kinesisClient = getAmazonKinesisClient("us-east-1");

       try {
           sendData(kinesisClient, streamName);
       } catch (IOException e) {
           e.printStackTrace();
       }
   }

   private static AmazonKinesis getAmazonKinesisClient(String regionName) {

       AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
       clientBuilder.setEndpointConfiguration(
               new AwsClientBuilder.EndpointConfiguration("kinesis.us-east-1.amazonaws.com",
                       regionName));
       clientBuilder.withCredentials(DefaultAWSCredentialsProviderChain.getInstance());
       clientBuilder.setClientConfiguration(new ClientConfiguration());

       return clientBuilder.build();
   }

   private static void sendData(AmazonKinesis kinesisClient, String streamName) throws IOException {

       PutRecordsRequest putRecordsRequest = new PutRecordsRequest();

       putRecordsRequest.setStreamName(streamName);
       List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
       for (int i = 0; i < 50; i++) {
           PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
           EventAttributes eventAttributes = EventAttributes.newBuilder().setName("Jon.Doe").build();
           putRecordsRequestEntry.setData(eventAttributes.toByteBuffer());
           putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
           putRecordsRequestEntryList.add(putRecordsRequestEntry);
       }

       putRecordsRequest.setRecords(putRecordsRequestEntryList);
       PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest);
       System.out.println("Put Result" + putRecordsResult);
   }

Avro schema in format .avdl:

@version("0.1.0")
@namespace("com.naya.avro")
protocol UBXEventProtocol{

 record EventAttributes{
               union{null, string} name=null;
 }
}

Autogenerated entity class by Avro:

@org.apache.avro.specific.AvroGenerated
public class EventAttributes extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = 2780976157169751219L;
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"EventAttributes\",\"namespace\":\"com.naya.avro\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

  private static SpecificData MODEL$ = new SpecificData();

  private static final BinaryMessageEncoder<EventAttributes> ENCODER =
      new BinaryMessageEncoder<EventAttributes>(MODEL$, SCHEMA$);

  private static final BinaryMessageDecoder<EventAttributes> DECODER =
      new BinaryMessageDecoder<EventAttributes>(MODEL$, SCHEMA$);
…

Github links:

Could someone please add some more detail on this? Why it’s not working on AWS?

Thank you in advance

  • Looks like the serialized class you are receiving has a diverging serialization ID from your local class and cannot be deserialized. Looks very much like a version conflict with avro. – SteffenJacobs Jan 09 '20 at 11:30
  • Thank you for your answer , but I tried with two versions of Apache Avro (1.9 & 1.9.1) , result is same , with version 1.8.2 application doesn’t compile – Pavel Moriakov Jan 09 '20 at 20:31

3 Answers3

3

Looking at the stack-trace, it looks like it's not happening when it tries to read the message, but actually during the initialization phase of the operator itself.

The way Flink works - it serializes (using Java serialization) every operator that needs to be executed and then distributes them in serialized form across the cluster. This means that KinesisSerializer will be serialized itself (as a class) to be sent over the wire.

Now the problem is, Kinesis serializer references EventAttributes model, which means that the reference to EventAttributes (class itself, not specific instance) will be serialized with it. And as part of serialized metadata is what it is expected to extend/implement. In your case, it needs SpecificRecordBase which is not a part of your distributable, but is part of Avro library.

So the full serialization chain for operator itself is KinesisConsumer -> KinesisSerializer -> EventAttributes -> SpecificRecordBase (part of Avro lib).

However, AWS Uses Flink 1.8 which uses Avro 1.8.2, and all the base avro classes are also from 1.8.2. You compile your application and link it against avro binaries of 1.9. So when Flink tries to serialize your operators and send them to cluster - it serializes a reference to SpecificRecordBase of version 1.9. But when Flink actually tries to deserialize it - it sees that the version does not match the class it actually has available (1.8.2) and the linking fails.

You have 2 options here:

  1. Do not use KDA. Instead go to EMR (has 1.9.1 packaged with it as of Jan 2020) or to Standalone Flink (will need to deploy it manually on EMR or barebone).
  2. Write your application completely using Flink 1.8. You mentioned that "with version 1.8.2 application doesn’t compile" - try to solve this issue instead.
bezmax
  • 25,562
  • 10
  • 53
  • 84
  • I'm having almost the exact same issue: `java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase; local class incompatible: stream classdesc serialVersionUID = 4445917349737100331, local class serialVersionUID = 189988654766568477",` we're using sbt-avro and switched our `AvroConfig / version` to 1.8.2, but still see the same error. Our application doesn't have avro as a direct dependency otherwise. Is there something else we should be doing? we are using flink 1.8.0 – rsheldiii Feb 21 '20 at 21:40
  • `java.io.ObjectStreamClass.lookup(classOf[SpecificRecordBase]).getSerialVersionUID` returns 189988654766568477 when `classOf[SpecificRecordBase].getPackage.getImplementationVersion` returns 1.8.2, so I'm not sure what version Amazon is expecting – rsheldiii Feb 21 '20 at 21:57
  • 1
    I've found the answer but not the solution: 4445917349737100331 is the SerialVersionUID of the SpecificRecordBase of avro 1.8.1 – rsheldiii Feb 24 '20 at 20:07
  • @rsheldiii Are you also running it on KDA? I wonder if they actually have Avro 1.8.1 in there. I personally never used KDA, my assumption of Avro 1.8.2 came from analyzing maven Flink dependencies. You can try changing sbt-avro config to 1.8.1 and see if that helps. – bezmax Feb 26 '20 at 20:02
  • Yep, we're running on KDA. I'm new to KDA and scala in general so I might be missing something, but spinning up a bare sbt project, importing Avro, and printing the uid of SpecificRecordBase v1.8.1 gets me the 4445 uid. We use flink-avro-confluent-registry so due to ivy's dependency resolution we can't use 1.8.1 unless we list it as a dependency and `force()`, `dependencyOverrides` only works for upgrades. when I do that I get a lot of errors around `cannot find symbol` with `class BinaryMessageEncoder`. we're looking into EMR with avro, or using Protobuf instead, which works fine – rsheldiii Mar 02 '20 at 18:59
3

We solved this issue. In the application, we used Avro 1.9.1, but AWS KDA uses Avro 1.8.1. Downgrading from 1.9.1 to 1.8.1 solved this issue.

0

This could happen due to a class loading issue in KDA (we've encountered that with Jackson as well), where as the class provided by your jar doesn't have the right priority. But I believe this issue is now fixed in KDA (for IAD and DUB at least)

JasonL
  • 3
  • 2