2

I am seeing some exceptions while writing stream to a topic

Output:
    Exception in thread "StreamThread-1" 
    org.apache.kafka.streams.errors.StreamsException: Failed to deserialize 
    value for record. topic=input_topic, partition=4, offset=9048083
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 572

Here is the code . Key is null(String) and value is avroserde

    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);

I am using specific Avro serde .So i gave the endpoint of schema registry

    final Map<String, String> serdeConfig = Collections.singletonMap(
            AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    final Serde<avroschema> avroserde = new SpecificAvroSerde<>();
    MasterSpinsSerde.configure(serdeConfig, false); // `false` for record values

Reading the source stream as below

    final KStreamBuilder builder = new KStreamBuilder();
    final KStream<String, avroschema> feeds = builder.stream("input_topic");

    feeds.to(Serdes.String(), avroserde,"output_topic");
    return new KafkaStreams(builder, streamsConfiguration);
user8617180
  • 267
  • 6
  • 20
  • 2
    Possible duplicate of [Handling bad messages using Kafka's Streams API](https://stackoverflow.com/questions/42666756/handling-bad-messages-using-kafkas-streams-api) – Dmitry Minkovsky Feb 02 '18 at 01:34
  • Side remark: don't think you need to provide the serde parameter in `to()` -- as you specify Serdes in `StreamsConfig` all operators use those Serdes by default (thus, you also don't need to configure the serde manually). – Matthias J. Sax Feb 02 '18 at 19:04
  • Thank you for your inputs, The issue was due to the Avro schema wasn't registered in the schema registry.It is fixed now. – user8617180 Feb 02 '18 at 20:26

0 Answers0