1

I am using the below code to produce Avro records of User class into Kafka topic, and it is working fine;

Sender class

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Properties;

import vo.User;

public class Sender8 {

    public static void main(String[] args) {

        User user = new User(10,"testName");
        Schema schema = ReflectData.get().getSchema(user.getClass());
        new GenericData.Record(schema);

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        props.put("schema.registry.url", "http://127.0.0.1:8081");

        KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);

        ReflectDatumWriter<Object> reflectDatumWriter = new ReflectDatumWriter<>(schema);
        GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema);
        ByteArrayOutputStream bytes = new ByteArrayOutputStream();

        try {
            reflectDatumWriter.write(user, EncoderFactory.get().directBinaryEncoder(bytes, null));
            GenericRecord avroRecord2 = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
            ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>("avrotesttopic1", avroRecord2);
            producer.send(record);
            producer.flush();

        } catch (IOException e1) {
            e1.printStackTrace();
        }

        producer.close();
    }
}

User class

public class User {
    int id;
    String name;

    public User(int id, String name) {
        super();
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

Sometimes, I might need to send a collection of objects as an arraylist, like;

ArrayList<User> users = new ArrayList<User>();

in this situation, what I does is, creates a loop to iterate through the list, pick individual records, and call send() method, like;

Iterator iter = users.iterator();
while (iter.hasNext()) {
   user = iter.next();
   //all other stuff here
   producer.send(record);
}

This works fine. But the problem is, if my arraylist has 50 records, producer.send(record) will be triggered 50 times. I would like to know if there is any other more efficient method to handle this, like calling sender only once, for all 50 records.

Alfred
  • 21,058
  • 61
  • 167
  • 249
  • EDIT: corrected serializer class. – Alfred Apr 24 '18 at 20:22
  • [What does your profiler tell you?](https://stackoverflow.com/questions/890222/analyzing-code-for-efficiency) because unless you have proof something is a problem it probably isn't. –  Apr 24 '18 at 22:48

1 Answers1

1

Unclear if your topic is expecting one message with 50 records in an array or 50 individual User messages.

If individual messages, this is the expected behavior. There is no overhead of calling producer.send repeatedly. It's like saying System.out.print, and all you're doing is writing data to Kafka instead of the console.

Even see this example uses a while loop

Look up in the pom.xml as well as src/main/avro to see where the Avro plugin is used and LogLine class are defined.

If one record of 50 results, you need to create a schema for a List<User> or define a class like

class UserList {
    List<User> users;
}

Also, as mentioned in the previous post, if you just use the Avro Maven Plugin, these classes can be generated for you

For example, in AVDL and getting started with Avro in Java

@namespace("com.example")
protocol DomainModels {
    record User {
      int id;
      string name;
    }
}

Will automatically create an Avro schema (avsc) and Java class for com.example.User and getters/setters, equalsTo, toString, etc.

Then, you use a SpecificRecord Type rather than a GenericRecord like so

Producer<String, User> producer = new KafkaProducer<String, User>(props);
for (User u : list) {
    producer.send(u);
}

because the generated User class will extend SpecificRecord


Again, if you had a list of objects in Avro, then AVDL supports Arrays

@namespace("com.example")
protocol DomainModels {
    record User {
      int id;
      string name;
    }

    record UserList {
       array<User> users;
    }
}

The alternative to what you are doing at the moment is using an AVSC format in-lined into the code (or better read from a file), but this is essentially what the ReflectDatum is generating.

I personally don't see the need for the Reflect Avro builder if you just have a simple Java object with no business logic. And if you did need business logic with the generated classes from the AVDL/AVSC files, you can more or less extract that to separate utility classes.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245