I've written a class to custom encode objects of UUID type to bytes to be transported across kafka and avro.
To use this class, I put an @AvroEncode(using=UUIDAsBytesEncoding.class)
above the uuid variable in my target object. (This is being implemented by the apache avro reflect library)
I'm having difficulty figuring out how to have my consumer automatically use the custom decoder. (or do I have to go in and manually decode it?).
Here is my UUIDAsBytesEncoder extends CustomEncoding class:
public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {
public UUIDAsBytesEncoding() {
List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");
schema = Schema.createUnion(union);
}
@Override
protected void write(Object datum, Encoder out) throws IOException {
if(datum != null) {
// encode the position of the data in the union
out.writeLong(1);
// convert uuid to bytes
byte[] bytes = new byte[16];
Conversion.uuidToByteArray(((UUID) datum),bytes,0,16);
// encode length of data
out.writeLong(16);
// write the data
out.writeBytes(bytes);
} else {
// position of null in union
out.writeLong(0);
}
}
@Override
protected UUID read(Object reuse, Decoder in) throws IOException {
System.out.println("READING");
Long size = in.readLong();
Long leastSig = in.readLong();
Long mostSig = in.readLong();
return new UUID(mostSig, leastSig);
}
}
The write method and encoding work well, but the read method is never getting called on deserialization. How would I implement this in a consumer?
The schema on the registry looks like:
{"type":"record","name":"Request","namespace":"xxxxxxx.xxx.xxx","fields":[{"name":"password","type":"string"},{"name":"email","type":"string"},{"name":"id","type":["null",{"type":"bytes","CustomEncoding":"UUIDAsBytesEncoding"}],"default":null}]} `
If the consumer can't automatically use that information to use the UUIDAsBytesEncoding read method, then how would I find the data marked with that tag in my consumer?
I am using the confluent schema-registry as well.
Any help would be appreciated!