I'm new with Apache Avro. Let me describe the problem. I'm trying to send some message using Apache Kafka from a producer application to a consumer application. Message schemas are not the same.
Producer schema (User.avsc):
{
"name": "User",
"namespace": "avro",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": "int"
}
]
}
Consumer schema (User.avsc):
{
"name": "User",
"namespace": "avro",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": "int"
},
{
"name": "favorite_color",
"type": "string",
"default": "green"
}
]
}
Classes:
public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {
@Override
public byte[] serialize(String topic, T data) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
if (data != null) {
BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
DatumWriter<T> datumWriter = new SpecificDatumWriter<>(data.getSchema());
datumWriter.write(data, binaryEncoder);
binaryEncoder.flush();
return byteArrayOutputStream.toByteArray();
}
} catch (Exception e) {
throw new RuntimeException("An exception occurred during serialization", e);
}
return null;
}
}
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
protected final Class<T> targetType;
public AvroDeserializer(Class<T> targetType) {
this.targetType = targetType;
}
@Override
public T deserialize(String topic, byte[] bytes) {
try {
if (bytes != null) {
DatumReader<T> datumReader =
new SpecificDatumReader<>(targetType.getDeclaredConstructor().newInstance().getSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
return datumReader.read(null, decoder);
}
} catch (Exception e) {
throw new RuntimeException("An exception occurred during deserialization", e);
}
return null;
}
}
public class UserProducer {
public static void main(String[] args) {
UserProducer userProducer = new UserProducer();
userProducer.writeUser();
}
public void writeUser() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);
Producer<String, SpecificRecord> recordProducer = new KafkaProducer<>(properties);
User user = User.newBuilder()
.setName("Bob")
.setFavoriteNumber(666)
.build();
ProducerRecord<String, SpecificRecord> record = new ProducerRecord<>("avro.User", null, user);
recordProducer.send(record);
recordProducer.flush();
recordProducer.close();
}
}
public class Consumer {
public static void main(String[] args) {
Consumer consumer = new Consumer();
consumer.readMessages();
}
public void readMessages() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "specific-record-consumer-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(properties,
new StringDeserializer(), new AvroDeserializer<>(User.class));
consumer.subscribe(Collections.singletonList("avro.User"));
while (true) {
consumer.poll(Duration.ofMillis(100)).forEach(System.out::println);
consumer.commitAsync();
}
}
}
Of course having the same schema it works fine. The problem is with schema evolving. On the receiver side there a new field with default value that should be set but ... I get an exception:
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition avro.User-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.RuntimeException: An exception occurred during deserialization
at serializer.AvroDeserializer.deserialize(AvroDeserializer.java:28)
at serializer.AvroDeserializer.deserialize(AvroDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1306)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1537)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1373)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:679)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1313)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at consumer.Consumer.readMessages(Consumer.java:34)
at consumer.Consumer.main(Consumer.java:18)
Caused by: java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:181)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:279)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:298)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:220)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:456)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at serializer.AvroDeserializer.deserialize(AvroDeserializer.java:25)
... 13 more
pom.xml in both applications is more less the same
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka-avro-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.9.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.9.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
What am I doing wrong?
I tried with equal schemas and it works but I cannot figure out why receiver side does not handle the lack of optional field.