16

I am trying to create a Kafka Streams Application which processes Avro records, but I am getting the following error:

Exception in thread "streams-application-c8031218-8de9-4d55-a5d0-81c30051a829-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

I am not sure what is causing this error. I am just trying to get Avro records into the application first where they then will be processed and then output to another topic but it doesn't not seem to be working. I have included the code from the application below. Can anyone see why it is not working?

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    Serde<String> stringSerde = Serdes.String();
    Serde<trackingReport> specificAvroTrackingReportSerde = new SpecificAvroSerde<trackingReport>();

    specificAvroTrackingReportSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);


    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, trackingReport> inputreports = builder.stream("intesttopic", Consumed.with(stringSerde, specificAvroTrackingReportSerde));


    KStream<String, trackingReport> outputreports = inputreports;

    String outputTopic = "outtesttopic";
    outputreports.to(outputTopic, Produced.with(stringSerde, specificAvroTrackingReportSerde));

    Topology topology = builder.build();

    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
R. B
  • 165
  • 1
  • 2
  • 8

1 Answers1

16

Unknown magic byte!

Means your data does not adhere to the wire format that's expected for the Schema Registry.

Or, in other words, the data you're trying to read, is not Avro, as expected by the Confluent Avro deserializer.

You can expect the same error by running kafka-avro-console-consumer, by the way, so you may want to debug using that too

If you are sure your data is indeed Avro, and the schema is actually sent as part of the message (would need to see your producer code), then you shouldn't use the Confluent Avro deserializers that are expecting a specific byte format in the message. Instead, you could use ByteArrayDesrializer and read the Avro record yourself, then pass it to the Apache Avro BinaryDecoder class. As a bonus, you can extract that logic into your own Deserialzer class

Also, if the input topic is Avro, I don't think you should be using this property for reading strings

DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • 1
    Yeah I just checked running the command and it did not work either. My producer is the same as the one at: https://stackoverflow.com/questions/53781639/why-wont-my-java-consumer-read-the-data-that-i-have-created – R. B Dec 18 '18 at 15:00
  • Yeah I understand that property but thought it was okay to override as I have done with Consumed.with – R. B Dec 18 '18 at 15:01
  • `intesttopic` is not the same topic as the one being sent to in the previous post – OneCricketeer Dec 18 '18 at 15:03
  • By the way, `outputreports` is an unnecessary variable. There's no need to copy the KStream variable to a new name – OneCricketeer Dec 18 '18 at 15:05
  • If I am not using the Confluent Avro deserializer should I be creating a custom one – R. B Dec 18 '18 at 15:45
  • 2
    Your deserializer needs to invert whatever serializer you used in the producer. In Kafka Streams, you have a Serde class that combines the two... I'm not sure if that answers your question – OneCricketeer Dec 18 '18 at 17:50
  • Re "Or if it is, and the schema is sent as part of the message (would need to see your producer code), then you shouldn't use the Confluent Avro deserializers" - what Deserializer should you use in this case? We're trying to read Avro messages from Oracle Golden Gate and getting the same error when attempting to deserialize with KafkaAvroDeserializer – Kevin Hooke Dec 27 '19 at 22:14
  • @Kevin What serializers / converters did you configure GoldenGate with? If you used the Confluent ones, then those work with KafkaAvroDeserializer. If you used JSON converters, then you wouldn't use Avro. Also make sure you check both the key and value of the converter settings – OneCricketeer Dec 27 '19 at 23:23
  • @cricket_007 KafkaAvroSerializer/KafkaAvroDeserializer with messages sent with GG Kafka Handler get the Unknown Magic Byte error, but the same KafkaAvroSerializer/KafkaAvroDeserializer for messages sent by GG using Kafka Connect Handler work as expected. Not sure if we had Kafka Handler misconfigured, but Kafka Connect Handler is working for us – Kevin Hooke Jan 20 '20 at 22:54
  • @Kevin , I'm actually not familiar with GoldenGate, but happy to look into it, if you create a full post wiith the issue rather than just a comment. The "Kafka Connect Handler" would use AvroConverter, which *wraps* those seializers you mentioned. Confluent only offers one combo of those classes, so I'm not sure what package names of those classes you're refering to – OneCricketeer Jan 20 '20 at 23:02
  • 1
    If you test deserialisation using the kafka-avro-console-consumer, be sure to add "--property print.key=true" The problem may be that the record key is not valid avro, even if the value is. – Kristiaan Aug 19 '22 at 10:48