I am developing a C Sharp sdk which takes in IEnumerable data and then send it to a restful API. The restful API will then push these records to Kafka.
I already had the schemaString (this.schemaString) and here is my implementation for the SDK serialization part:
public string ValidateAvroSchema<T>(IEnumerable<T> value) {
using(var ms = new MemoryStream()){
try{
Avro.IO.Encoder e = new BinaryEncoder(ms);
var schema = Schema.Parse(this.schemaString) as RecordSchema;
var writer = new GenericDatumWriter<GenericRecord>(schema);
foreach(T item in value) {
GenericRecord record = new GenericRecord(schema);
FieldInfo[] fieldsInfo;
Type typeParameterType = typeof(T);
var type = item.GetType();
fieldsInfo = typeParameterType.GetFields();
for (int i = 0; i < fieldsInfo.Length; i++)
{
record.Add(fieldsInfo[i].Name, GetFieldValue(item, fieldsInfo[i].Name));
}
writer.Write(record, e);
}
// I am passing this string to Restful API so the Java side can parse it
return Convert.ToBase64String(ms.ToArray());;
} catch (AvroException e) {
// handle exception
}
}
}
On API sied I did something like :
byte[] input = Base64.decodeBase64(payloadInJson.toString());
List<GenericRecord> listOfRecords = new ArrayList<>();
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
InputStream inputStream = new ByteArrayInputStream(input);
BinaryDecoder decoder = new DecoderFactory().get().binaryDecoder(inputStream, null);
while(true){
try {
GenericRecord record = reader.read(null, decoder);
listOfRecords.add(record);
} catch (EOFException eof) {
break;
}
}
It's working now. Thanks for you guys.
Only one question left.
Question 1 : Is it proper to use reflection to get all properties of and then add them to GenericRecord? Seems it is very expensive.
Thanks a lot.