I am seeing some exceptions while writing stream to a topic
Output:
Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: Failed to deserialize
value for record. topic=input_topic, partition=4, offset=9048083
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 572
Here is the code . Key is null(String) and value is avroserde
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
I am using specific Avro serde .So i gave the endpoint of schema registry
final Map<String, String> serdeConfig = Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
final Serde<avroschema> avroserde = new SpecificAvroSerde<>();
MasterSpinsSerde.configure(serdeConfig, false); // `false` for record values
Reading the source stream as below
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, avroschema> feeds = builder.stream("input_topic");
feeds.to(Serdes.String(), avroserde,"output_topic");
return new KafkaStreams(builder, streamsConfiguration);