0

I'm new with Apache Avro. Let me describe the problem. I'm trying to send some message using Apache Kafka from a producer application to a consumer application. Message schemas are not the same.

Producer schema (User.avsc):

{
  "name": "User",
  "namespace": "avro",
  "type": "record",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    }
  ]
}

Consumer schema (User.avsc):

{
  "name": "User",
  "namespace": "avro",
  "type": "record",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    },
    {
      "name": "favorite_color",
      "type": "string",
      "default": "green"
    }
  ]
}

Classes:

public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {

    @Override
    public byte[] serialize(String topic, T data) {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            if (data != null) {
                BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
                DatumWriter<T> datumWriter = new SpecificDatumWriter<>(data.getSchema());
                datumWriter.write(data, binaryEncoder);
                binaryEncoder.flush();
                return byteArrayOutputStream.toByteArray();
            }
        } catch (Exception e) {
            throw new RuntimeException("An exception occurred during serialization", e);
        }
        return null;
    }
}
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {

    protected final Class<T> targetType;

    public AvroDeserializer(Class<T> targetType) {
        this.targetType = targetType;
    }

    @Override
    public T deserialize(String topic, byte[] bytes) {
        try {
            if (bytes != null) {
                DatumReader<T> datumReader =
                        new SpecificDatumReader<>(targetType.getDeclaredConstructor().newInstance().getSchema());
                Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
                return datumReader.read(null, decoder);
            }
        } catch (Exception e) {
            throw new RuntimeException("An exception occurred during deserialization", e);
        }
        return null;
    }
}
public class UserProducer {

    public static void main(String[] args) {
        UserProducer userProducer = new UserProducer();
        userProducer.writeUser();
    }

    public void writeUser() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);

        Producer<String, SpecificRecord> recordProducer = new KafkaProducer<>(properties);

        User user = User.newBuilder()
                .setName("Bob")
                .setFavoriteNumber(666)
                .build();

        ProducerRecord<String, SpecificRecord> record = new ProducerRecord<>("avro.User", null, user);
        recordProducer.send(record);
        recordProducer.flush();
        recordProducer.close();
    }
}
public class Consumer {

    public static void main(String[] args) {
        Consumer consumer = new Consumer();
        consumer.readMessages();
    }

    public void readMessages() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "specific-record-consumer-group");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, User> consumer = new KafkaConsumer<>(properties,
                new StringDeserializer(), new AvroDeserializer<>(User.class));

        consumer.subscribe(Collections.singletonList("avro.User"));

        while (true) {
            consumer.poll(Duration.ofMillis(100)).forEach(System.out::println);
            consumer.commitAsync();
        }
    }
}

Of course having the same schema it works fine. The problem is with schema evolving. On the receiver side there a new field with default value that should be set but ... I get an exception:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition avro.User-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.RuntimeException: An exception occurred during deserialization
    at serializer.AvroDeserializer.deserialize(AvroDeserializer.java:28)
    at serializer.AvroDeserializer.deserialize(AvroDeserializer.java:10)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1306)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1537)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1373)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:679)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1313)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at consumer.Consumer.readMessages(Consumer.java:34)
    at consumer.Consumer.main(Consumer.java:18)
Caused by: java.io.EOFException
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
    at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:181)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:279)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:298)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:220)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:456)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at serializer.AvroDeserializer.deserialize(AvroDeserializer.java:25)
    ... 13 more

pom.xml in both applications is more less the same

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kafka-avro-producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-compiler</artifactId>
            <version>1.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.9.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.9.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
                            <stringType>String</stringType>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

What am I doing wrong?

I tried with equal schemas and it works but I cannot figure out why receiver side does not handle the lack of optional field.

Cinas
  • 3
  • 2

2 Answers2

0

If you have to handle evolving schema it is a little bit tricky. Refer the question here. One of the ways you can probably handle that is by adding the schema to the header and using the schema from header while deserializing.

But the problem with that approach is you are still sending a lot of data which beats the purpose of using something like Avro. Avro is used to reduce the size of data being transmitted.

The best way to handle this is to use Confluent schema registry. It is open source. You can run a docker container in your local if you want to use it and configure it with your application.

pacman
  • 725
  • 1
  • 9
  • 28
  • Hmm, but in my opinion old schema is inside the serialized object (according to the https://en.wikipedia.org/wiki/Apache_Avro), the new schema is on clients side (new SpecificDatumReader<>(targetType.getDeclaredConstructor().newInstance().getSchema()) gets the new schema) – Cinas Dec 23 '22 at 11:12
  • @Cinas Schema is not part of serialized object. the data is serialized according to the schema. If schema was part of the serialized object you wouldn't need a schema to deserialize the data and tools like schema registry wouldn't exist. – pacman Dec 23 '22 at 11:21
  • You are right. But anyway SpecificDatumReader constructor gets new schema. In my opinion the problem lies somewhere else, looks like client side reads more than should, expecting 3rd optional field. – Cinas Dec 23 '22 at 11:34
  • @Cinas The problem is that you are trying to deserialize with different schema. Even though you are providing default values it will not work. It needs exactly the same schema it was serialized wtih. – pacman Dec 23 '22 at 11:37
  • You are right. And I see where the problem lies. Somewhere in reader the actual and expected schemas are the same, both are new schemas. – Cinas Dec 23 '22 at 11:41
  • _needs exactly the same schema it was serialized wtih_ - This isn't true since it goes against the whole idea of schema evolution. _If schema was part of the serialized object you wouldn't need a schema to deserialize the data and tools like schema registry wouldn't exist_ - Even with the Registry, you can provide a reader schema that doesn't match the producer's – OneCricketeer Dec 23 '22 at 14:53
  • Avro deserialization is purely based on field names. Defaults are only applied to the reader schema. This allows you to provide any subset of fields to deserialize into a new object while adding others with a defined value (a forward projection) – OneCricketeer Dec 23 '22 at 15:04
0

Avro needs the reader and writer schema when deserializing binary. This can be done with BinaryMessageDecoder.addSchema

Here's a simple unit test that shows the concept.

@Test
void avroRoundTrip() throws IOException {
    User u = User.newBuilder()
        .setName("foobar")
        .setFavoriteNumber(0)
        .build();
    ByteBuffer bb = u.toByteBuffer();

    ColoredUser.getDecoder().addSchema(User.SCHEMA$);
    ColoredUser cu = ColoredUser.fromByteBuffer(bb);
    System.out.println(cu);
    // {"name": "foobar", "favorite_number": 0, "favorite_color": "green"}
}

You already know what types you have at runtime, so just make specific deserializers (e.g. implements Deserializer<ColoredUser>, and don't try to use generics unless you're trying to make some shared library.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245