0

I have been trying to trying to serilalize avro generic record and generate avro serialized data to send to kafka. The major goal is to not use confluent schema registry for storing schema but sending the schema along with the serialized data so it can be extracted from kafka topic and deserialized.

The below is the part of AvroSerializer for generating Avro data.


  @Override
  public byte[] serialize(String topic, T data) {
    try {
      byte[] result = null;
      if (data != null) {
        LOGGER.debug("data='{}'", data);


        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
            EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
        datumWriter.setSchema(data.getSchema());
        datumWriter.write(data, binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close(); 
        result = byteArrayOutputStream.toByteArray();


      }

      return result;
    } catch (IOException ex) {
      throw new SerializationException(
          "Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
    }
  }

The serialized data present in kafka looks like this.

enter image description here

The AvroDeserializer part looks like this.

  @Override
  public T deserialize(String topic, byte[] data) {

    GenericRecord person = null;

    try {
      T result = null;

      if (data != null) {
        LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));

        Schema schema = Schema.parse(schemaString);
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);

 
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);
        LOGGER.debug(result.getSchema().toString());
        LOGGER.debug("deserialized data='{}'", result);
      }

      return result;

    } catch (Exception ex) {
      throw new SerializationException(
          "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }


The producer is shown below

public class KafkaAvroProducerUtil {


    public  Future<RecordMetadata> produceTokafka(GenericRecord object) throws IOException {


        Properties properties = new Properties();
        // normal producer
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "all");
        properties.setProperty("retries", "10");
        // avro part

        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", AvroSerializer.class.getName());
 


        String topic = "avro";

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties);
        ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<String, GenericRecord>(
                topic, object
        );

        Future<RecordMetadata> data = producer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata);
                } else {
                    exception.printStackTrace();
                }
            }
        });

        producer.flush();
        producer.close();


        return data;
    }

When I try to deserialize this it says schema is needed. The problem what I understand is that as you see the data in image above(snapshot of consumer running on cmd) the schema is not send along with it. How can I send schema along with the data so that I can deserialize with the schema send along with the data.

pacman
  • 725
  • 1
  • 9
  • 28
  • Please show a [mcve] that includes your Kafka producer and the `data` definition – OneCricketeer Mar 23 '21 at 13:08
  • Note: the code I linked to before never calls `datumWriter.setSchema` – OneCricketeer Mar 23 '21 at 13:10
  • @OneCricketeer I have edited the question to add more details. Yes the code is previously I had. I have tried the code you suggested for the same but it was producing the same output. – pacman Mar 23 '21 at 13:34
  • _When I try to deserialize this it says schema is needed_ - 1) What is saying this? 2) Yes, as answered before, Avro **requires** a "reader schema" , despite having a "writer schema" part of the data (or not) – OneCricketeer Mar 23 '21 at 13:51
  • Also, why are you using Object here? `KafkaProducer` and where is the object you're sending defined? – OneCricketeer Mar 23 '21 at 13:53
  • @OneCricketeer It is `GenericRecord` and not `Object` mistake made while removing unnecessary code. – pacman Mar 23 '21 at 13:57
  • @OneCricketeer So you are saying is schema registry is inevitable? It is not possible to send schema along with data? "When I try to deserialize this it says schema is needed" - this error is thrown while deserialization in AvroDeserializer if Iam not providng the schema in the deserializer as shown above. The thing is I hoped to get the schema from the data itself. – pacman Mar 23 '21 at 13:58
  • 2
    Schema Registry is not "inevitable". I'm saying the deserializer will always need to use a schema (but there's no easy way I'm aware of to extract it from the incoming byte array)... You might have better luck with the Jackson Avro library – OneCricketeer Mar 23 '21 at 14:09
  • @OneCricketeer " I'm aware of to extract it from the incoming byte array" - any resource you could point to me regarding this? – pacman Mar 23 '21 at 14:12
  • I said there is no easy way, meaning, I've looked at [the Avro source code](https://github.com/apache/avro/tree/master/lang/java/avro/src/main/java/org/apache/Avro), and I couldn't find anything of use. Essentially, you'd have to follow the same logic that Confluent does and know exactly what offsets of the byte array contain the schema, decode that as UTF8, then pass it to the Schema.parse method – OneCricketeer Mar 23 '21 at 14:19
  • @OneCricketeer Thank you so much for your time. I really appreciate it. :) – pacman Mar 23 '21 at 14:21
  • One more thing - if you [look at this post](https://martin.kleppmann.com/2012/12/05/schema-evolution-in-avro-protocol-buffers-thrift.html), you can see how Avro bytes are actually represented, and notice no Schema/field information is actually encoded into the binary, thus why a reader schema is necessary – OneCricketeer Mar 23 '21 at 14:31
  • @OneCricketeer can you share your views on the soluton below and any thoughts to improvise it! – pacman Mar 25 '21 at 14:04
  • What's the point of sending the schema along with the data? Are you sure you are doing the right thing? Schema is expected to be something that doesn't change too often. And one of its purposes is to bear structure. By extracting structure from an object and sending only the bare data, you achieve significant compression. Even if you're going to serialize totally random objects, I'm sure there are much better ways to do it more efficiently. Anyway, I suggest you take a look at the [thread](https://stackoverflow.com/q/51468694/355438). – Ilya Serbis Apr 13 '23 at 21:12

1 Answers1

3

EDITS: I have approached the answers in two ways as per the suggestions of @OneCricketeer and @ChinHuang.

Both the approaches are explained below. But the answer for the header approach is shown below.

APPROACH 1: Sending schema along with data

In this approach I seraialized the Avro schema as string and along with a delimiter and send them to kafka topic adding the data along with it.

While deserializing after reading the data from kafka topic split the byte array as schema and data using the delimiter. Then I would convert schema bytes back to schema and then use that schema to deserialize the data.

Cons of the apporach: As @OneCricketeer said

  1. It is definitely non performant
  2. The whole approach would break if the delimitter comes in the schema

APPROACH 2: Sending schema in the header

Here rather than sending schema along with the data , the schema is send in the header.

the methods in the Serializer class are shown below.


  @Override
  public byte[] serialize(String topic, T data) {


   return null;
 
}

  public  byte[] serialize(String topic, Headers headers, T data) {


    try {

      byte[] result = null;
      byte[] payload = null;
      if (data != null) {
        LOGGER.debug("data='{}'", data);


        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
                EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);

        byte[] schemaBytes = data.getSchema().toString().getBytes();

        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
        datumWriter.setSchema(data.getSchema());
        datumWriter.write(data, binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close();


        result = byteArrayOutputStream.toByteArray();

        ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream( );
        outputStream2.write( result );
        payload =  outputStream2.toByteArray( );

        headers.add("schema",schemaBytes);

      }

      LOGGER.info("headers added");
      return payload;
    } catch (IOException ex) {
      throw new SerializationException(
              "Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
    }
  }

the Deserializer methods are shown below.



  @Override
  public T deserialize(String topic, byte[] data) {

      return  null


   }
  public T deserialize(String topic, Headers headers, byte[] data) {


    try {
      T result = null;

      if (data != null) {
        LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
        Header header = headers.lastHeader("schema");

        String schemaString2 = new String(header.value());
  
        Schema schema = Schema.parse(schemaString2);
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
        DataFileReader<GenericRecord> dataFileReader = null;

        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);

        LOGGER.debug(result.getSchema().toString());
        LOGGER.debug("deserialized data='{}'", result);

      }

      return (T) result;

    } catch (Exception ex) {
      throw new SerializationException(
              "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }

pacman
  • 725
  • 1
  • 9
  • 28
  • Yeah, that'll work, however definitely not performant over the wire, and will fail when your schema includes the delimiter (in a field comment, for example) – OneCricketeer Mar 25 '21 at 14:26
  • @OneCricketeer Is there anyway to make this apporach performant? I was thinking the same problem with delimiter, is there any characterset that can never occur in Avro schema? – pacman Mar 25 '21 at 15:29
  • 2
    You can send the Avro schema in a [Kafka message header](https://cwiki.apache.org/confluence/display/KAFKA/A+Case+for+Kafka+Headers), with the Avro data separately in the Kafka message value. – Chin Huang Mar 25 '21 at 16:58
  • @ChinHuang I saw that option but the method to access the data is set as default in the `org.apache.kafka.common.serialization.Serializer` interface `default byte[] serialize(String topic, Headers headers, T data) { return serialize(topic, data); }`. How can I make the serializer call this method instead of the one Iam using now both while serializing and deserializing. Would it be fine if I call this method from current method in Serializer passing headers as null while serializing? Even then while deserializing would the default method would be used rather than the currently used in code. – pacman Mar 25 '21 at 17:53
  • 1
    The Kafka consumer [invokes the deserialize method with the Headers parameter](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1387). Your custom Deserializer implementation simply needs to override this method. – Chin Huang Mar 25 '21 at 18:10
  • @ChinHuang So even for Serializer, does overriding the default method would automatically inovke it or would have to be called? – pacman Mar 26 '21 at 03:36
  • @ChinHuang I have changed the code to send schema in header. But there is something I don't get clairty on how the code works. As you see the header is just set in the method but not passed or returned to anywhere. Any Idea how kafka picks it even though Iam not sending it or passing to anyother methods? Also during serialization when I override the default method how is that suddenly used and not the method that needs to be overrided? – pacman Mar 26 '21 at 04:51
  • 1
    The method signature with the headers is always the one that is **actually** called. The default signature is a passthrough https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L925 – OneCricketeer Mar 26 '21 at 22:17
  • You should ideally throw a SerializationException in the serialize method without headers rather than explicitly return null, with a message saying that headers are required. And no, headers are not returned in the method, unfortunately they're passed by reference and modified/used in the calling send method of the producer – OneCricketeer Mar 26 '21 at 22:25