21

I'm looking for a way to convert a POJO to an avro object in a generic way. The implementation should be robust to any changes of the POJO-class. I have achieved it but filling the avro record explicitly (see example below).

Is there a way to get rid of the hard-coded field names and just fill the avro record from the object? Is reflection the only way, or does avro provide this functionality out of the box?

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.reflect.ReflectData;

public class PojoToAvroExample {

    static class PojoParent {
        public final Map<String, String> aMap = new HashMap<String, String>();
        public final Map<String, Integer> anotherMap = new HashMap<String, Integer>();
    }

    static class Pojo extends PojoParent {
        public String uid;
        public Date eventTime;
    }

    static Pojo createPojo() {
        Pojo foo = new Pojo();
        foo.uid = "123";
        foo.eventTime = new Date();
        foo.aMap.put("key", "val");
        foo.anotherMap.put("key", 42);
        return foo;
    }

    public static void main(String[] args) {
        // extract the avro schema corresponding to Pojo class
        Schema schema = ReflectData.get().getSchema(Pojo.class);
        System.out.println("extracted avro schema: " + schema);
        // create avro record corresponding to schema
        Record avroRecord = new Record(schema);
        System.out.println("corresponding empty avro record: " + avroRecord);

        Pojo foo = createPojo();
        // TODO: to be replaced by generic variant:
        // something like avroRecord.importValuesFrom(foo);
        avroRecord.put("uid", foo.uid);
        avroRecord.put("eventTime", foo.eventTime);
        avroRecord.put("aMap", foo.aMap);
        avroRecord.put("anotherMap", foo.anotherMap);
        System.out.println("expected avro record: " + avroRecord);
    }
}
Fabian Braun
  • 3,612
  • 1
  • 27
  • 44
  • 2
    Why not use [Avro's ReflectDatumWriter](http://stackoverflow.com/questions/11866466/using-apache-avro-reflect) to serialize the POJO? – Chin Huang Jul 06 '15 at 18:16
  • I'm using avro in hadoop context. For Serialization I would like to use the AvroParquetOutputFormat – Fabian Braun Jul 13 '15 at 12:13
  • 1
    An inefficient approach would have [ReflectDatumWriter write a POJO to bytes then GenericDatumReader reads the bytes to GenericRecord](http://stackoverflow.com/questions/26435299/write-pojos-to-parquet-file-using-reflection). – Chin Huang Jul 13 '15 at 21:35

6 Answers6

13

Are you using Spring?

I build a mapper for that using a Spring feature. But it is also possible to build such a mapper via raw reflection utils too:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectData;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.util.Assert;

public class GenericRecordMapper {

    public static GenericData.Record mapObjectToRecord(Object object) {
        Assert.notNull(object, "object must not be null");
        final Schema schema = ReflectData.get().getSchema(object.getClass());
        final GenericData.Record record = new GenericData.Record(schema);
        schema.getFields().forEach(r -> record.put(r.name(), PropertyAccessorFactory.forDirectFieldAccess(object).getPropertyValue(r.name())));
        return record;
    }

    public static <T> T mapRecordToObject(GenericData.Record record, T object) {
        Assert.notNull(record, "record must not be null");
        Assert.notNull(object, "object must not be null");
        final Schema schema = ReflectData.get().getSchema(object.getClass());
        Assert.isTrue(schema.getFields().equals(record.getSchema().getFields()), "Schema fields didn't match");
        record.getSchema().getFields().forEach(d -> PropertyAccessorFactory.forDirectFieldAccess(object).setPropertyValue(d.name(), record.get(d.name()) == null ? record.get(d.name()) : record.get(d.name()).toString()));
        return object;
    }

}

With this mapper you can generate a GenericData.Record which can be easily serialized to avro. When you deserialize an Avro ByteArray you can use it to rebuild a POJO from deserialized record:

Serialize

byte[] serialized = avroSerializer.serialize("topic", GenericRecordMapper.mapObjectToRecord(yourPojo));

Deserialize

GenericData.Record deserialized = (GenericData.Record) avroDeserializer.deserialize("topic", serialized);

YourPojo yourPojo = GenericRecordMapper.mapRecordToObject(deserialized, new YourPojo());
lucapette
  • 20,564
  • 6
  • 65
  • 59
TranceMaster
  • 147
  • 1
  • 4
  • 2
    Nice, but doesn't seem to handle Lists, Sets correctly, e.g. List gets translated to a List containing one string with all elements, i.e. toString of the list – Chris W. Aug 22 '19 at 10:01
  • Will this works with msg that have a different Avro's namespace at consumer & producer side and below that everything is the same. – Harsh Mishra Jan 24 '22 at 12:02
  • This also doesn't handle nested or recursive objects – OneCricketeer Jun 04 '22 at 14:34
8

Here is generic way to convert

public static <V> byte[] toBytesGeneric(final V v, final Class<V> cls) {
        final ByteArrayOutputStream bout = new ByteArrayOutputStream();
        final Schema schema = ReflectData.get().getSchema(cls);
        final DatumWriter<V> writer = new ReflectDatumWriter<V>(schema);
        final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(bout, null);
        try {
            writer.write(v, binEncoder);
            binEncoder.flush();
        } catch (final Exception e) {
            throw new RuntimeException(e);
        }


        return bout.toByteArray();
    }

public static void main(String[] args) {
    PojoClass pojoObject = new PojoClass();
    toBytesGeneric(pojoObject, PojoClass.class);
}
big
  • 1,888
  • 7
  • 28
  • 48
4

With jackson/avro, it's very easy to convert pojo to byte[], similar to jackson/json:

byte[] avroData = avroMapper.writer(schema).writeValueAsBytes(pojo);

p.s.
jackson handles not only JSON, but also XML/Avro/Protobuf/YAML etc, with very similar classes and APIs.

Leon
  • 3,124
  • 31
  • 36
  • After sending it as byte[] how does the client convert the byte[] back to the POJO? Does the client need to know POJO information in advance or is it possible to construct the POJO from the AvroSchema of jackson/avro? – user2441441 Jun 17 '22 at 21:20
2

Two steps to convert any pojo class to avro genric record

  1. Using jackson/avro, to convert the pojo into bytes with Avro Mapper.

  2. Using Avro GenericDatumReader to read it as Generic Record.

public class AvroConverter{

 public static GenericRecord convertToGenericRecord(String schemaPath, SomeClass someObject){
  Schema schema = new Schema.Parser().setValidate(true).parse(new ClassPathResource(schemaPath).getFile());
  AvroSchema avSchema = new AvroSchema(schema);
  ObjectWritter writter = new AvroMapper().writer(avSchema);
  final byte[] bytes = writter.writeValueAsBytes(someObject);
  GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(avSchema);
  return (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes, null));
 }

}

Gradle Depedency

 // https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-avro
    implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-avro'
Sivaram Rasathurai
  • 5,533
  • 3
  • 22
  • 45
  • What is this code supposed to do? `ObjectWritter writter` is not even being used (and is misspelled), and `GenericDatumReader` takes a `Schema` argument and not an `AvroSchema` argument (which is what is being provided here). This code does not even compile. – filpa Jul 01 '22 at 16:31
  • @filpa thanks for pointing out. the avroMapper is used to write the object into bytes. I had some typo. hope you clear now – Sivaram Rasathurai Jul 04 '22 at 00:18
0

In addition to my comment to @TranceMaster answer the modified version below works for me with primitive types and Java sets:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectData;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.util.Assert;

public class GenericRecordMapper {

    public static GenericData.Record mapObjectToRecord(Object object) {
        Assert.notNull(object, "object must not be null");
        final Schema schema = ReflectData.get().getSchema(object.getClass());
        System.out.println(schema);
        final GenericData.Record record = new GenericData.Record(schema);
        schema.getFields().forEach(r -> record.put(r.name(), PropertyAccessorFactory.forDirectFieldAccess(object).getPropertyValue(r.name())));
        return record;
    }

    public static <T> T mapRecordToObject(GenericData.Record record, T object) {
        Assert.notNull(record, "record must not be null");
        Assert.notNull(object, "object must not be null");

        final Schema schema = ReflectData.get().getSchema(object.getClass());
        Assert.isTrue(schema.getFields().equals(record.getSchema().getFields()), "Schema fields didn't match");

        record
                .getSchema()
                .getFields()
                .forEach(field ->
                    PropertyAccessorFactory
                            .forDirectFieldAccess(object)
                            .setPropertyValue(field.name(), record.get(field.name()))
                );
        return object;
    }
}
Chris W.
  • 2,266
  • 20
  • 40
-1

I needed exactly such a thing myself. The library you need is in avro JAR files, but strangely, doesn't seem to have a way to invoke it from the avro-tools command line.

Invoke it as: java GenerateSchemaFromPOJO com.example.pojo.Person Person.java

import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;

import org.apache.avro.Schema;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.avro.AvroFactory;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator;
import com.fasterxml.jackson.dataformat.avro.schema.VisitorFormatWrapperImpl;

public class GenerateSchemaFromPOJO {

    public static void main(String[] args) {
        String className  = null;
        String outputFile = null;
        Writer outputWriter = null;
        try {
            if(args.length != 2) {
                System.out.println("Usage: java " + GenerateSchemaFromPOJO.class.getCanonicalName() + " classname output-schema-file.json");
                System.exit(1);
            }
            className = args[0];
            outputFile = args[1];

            Class<?> clazz = Class.forName(className);

            AvroFactory avroFactory = new AvroFactory();
            ObjectMapper mapper = new ObjectMapper(avroFactory);

            AvroSchemaGenerator gen = new AvroSchemaGenerator();
            mapper.acceptJsonFormatVisitor(clazz, gen);
            AvroSchema schemaWrapper = gen.getGeneratedSchema();

            Schema avroSchema = schemaWrapper.getAvroSchema();
            String asJson = avroSchema.toString(true);

            outputWriter = new FileWriter(outputFile);
            outputWriter.write(asJson);
        } catch (Exception ex) {
            System.err.println("caught " + ex);
            ex.printStackTrace();
            System.exit(1);
        } finally {
            if(outputWriter != null) {
                try {
                    outputWriter.close();
                } catch (IOException e) {
                    System.err.println("Caught " + e + " while trying to close outputWriter to " + outputFile);;
                    e.printStackTrace();
                }
            }
        }
    }
}
  • As I understand from your answer, your code generates the avro schema for the given `clazz`. This is not, what I was asking for in the question. I do the same in line `ReflectData.get().getSchema(Pojo.class);`. I was looking for a way to replace `avroRecord.put(..., ...);` with a generic variant – Fabian Braun Apr 25 '16 at 09:05