0

I am developing a C Sharp sdk which takes in IEnumerable data and then send it to a restful API. The restful API will then push these records to Kafka.

I already had the schemaString (this.schemaString) and here is my implementation for the SDK serialization part:

        public string ValidateAvroSchema<T>(IEnumerable<T> value) {
            using(var ms = new MemoryStream()){
                try{
                    Avro.IO.Encoder e = new BinaryEncoder(ms);
                    var schema = Schema.Parse(this.schemaString) as RecordSchema;
                    var writer = new GenericDatumWriter<GenericRecord>(schema);

                    foreach(T item in value) {
                        GenericRecord record = new GenericRecord(schema);
                        FieldInfo[] fieldsInfo;
                        Type typeParameterType = typeof(T);
                        var type = item.GetType();
                        fieldsInfo = typeParameterType.GetFields();
                        for (int i = 0; i < fieldsInfo.Length; i++)
                        {
                            record.Add(fieldsInfo[i].Name, GetFieldValue(item, fieldsInfo[i].Name));
                        }
                        writer.Write(record, e);
                    }

                    // I am passing this string to Restful API so the Java side can parse it
                    return Convert.ToBase64String(ms.ToArray());;
                } catch (AvroException e) {
                    // handle exception
                }
            }
        }

On API sied I did something like :

    byte[] input = Base64.decodeBase64(payloadInJson.toString());
    List<GenericRecord> listOfRecords = new ArrayList<>();
    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
    InputStream inputStream = new ByteArrayInputStream(input);
    BinaryDecoder decoder = new DecoderFactory().get().binaryDecoder(inputStream, null);
    while(true){
        try {
            GenericRecord record = reader.read(null, decoder);
            listOfRecords.add(record);
        } catch (EOFException eof) {
            break;
        }
    }

It's working now. Thanks for you guys.

Only one question left.

Question 1 : Is it proper to use reflection to get all properties of and then add them to GenericRecord? Seems it is very expensive.

Thanks a lot.

Shawn
  • 1
  • 1
  • 2
  • If you are trying to serialize an `IEnumerable value` to JSON, why not just use one of the many JSON serializers available for .Net? See e.g. [This answer to *How do I turn a C# object into a JSON string in .NET?*](https://stackoverflow.com/a/19137100/3744182) – dbc Apr 16 '20 at 17:13
  • 1
    Add NuGet reference to [JSON.NET](https://www.newtonsoft.com/json)? Should be able to handle it. No need to write this yourself. But yes, it is proper to use reflection, get all properties, and serialize them. That's how most serializers work. – Sean Skelly Apr 16 '20 at 17:13
  • lol @dbc. If I hadn't reviewed for spelling errors, we might have posted within 1 second. – Sean Skelly Apr 16 '20 at 17:14
  • @dbc I totally got what you mean. However I would like to validate this IEnumerable against our schemaString first and then transform it to JsonString. The way I validate it is using writer and reader looping through the whole item list. Not sure if there is a better way. The other question is : – Shawn Apr 16 '20 at 17:29
  • So basically you're using the schema to prune away invalid or unwanted properties, and then serialize the ones that are left to JSON? Is the portion of your code that converts to `GenericRecord`, so you only need help serializing `GenericRecord` to JSON? Is this the `GenericRecord` record you are using? https://avro.apache.org/docs/1.8.2/api/csharp/html/classAvro_1_1Generic_1_1GenericRecord.html – dbc Apr 16 '20 at 17:38
  • @dbc Yes, you are correct. I am using this Apache.Avro package. I was trying to use reflect package since it seems to be the best shot but it is not exposed. https://github.com/confluentinc/confluent-kafka-dotnet/issues/518 https://github.com/confluentinc/avro/issues/8 – Shawn Apr 16 '20 at 17:40
  • This regarding speed. Have a look at IAsyncEnumerable – maxspan Apr 16 '20 at 20:43
  • Why are you making Avro yourself instead of using existing solutions? https://github.com/confluentinc/confluent-kafka-dotnet – OneCricketeer Apr 17 '20 at 05:38
  • @cricket_007 Sorry that I am not sure what you are referring to. My problem here is the correct way to transform a list of generic items to Avro format. Thanks – Shawn Apr 17 '20 at 14:07
  • What do you mean by "generic items"? What is `T`? Does `T` extend from the [`GenericRecord`](https://avro.apache.org/docs/1.8.2/api/csharp/html/classAvro_1_1Generic_1_1GenericRecord.html)? Or if you used `schemagen` to create these "generic items", then you would have a list of `SpecificRecord` subclasses, not a generic `T`... Producing `List` is no different than producing just one `T`. You either must serialize the list in one kafka record. Or you loop over that list... Do **NOT** convert to Avro while looping over the list and producing to Kafka. Use LINQ to map the oriiginal into Avro – OneCricketeer Apr 17 '20 at 21:59
  • @cricket_007 1. T doesn't extend from either GenericRecord or SpecificRecord. 2. SDK side doesn't communicate to Kafka. It will need to serialize these records first and then send them to API, it's API's responsibility to send these records to Kafka. Makes sense? – Shawn Apr 20 '20 at 01:49
  • So, your API takes what type of model? `T`? Avro can work over HTTP or gPRC, too, btw, so you don't really need to do too much translation should you choose to do that – OneCricketeer Apr 20 '20 at 03:18
  • @cricket_007 For now, I want my API to take two types of input. 1. Json string for the users who calls API directly. 2. Base64 formatted string for users who uses SDK to call API. Makes sense? – Shawn Apr 20 '20 at 13:02
  • Sure. Does that work? Can you show a [mcve]? Why base64 , though? The SDK should be able to send the JSON string as well – OneCricketeer Apr 20 '20 at 23:23

1 Answers1

0

In my opinion, the most convenient way would be to use:

        public string ValidateAvroSchema<T>(IEnumerable<T> value) {

          byte[] result = AvroConvert.SerializeHeadless(value.ToList(), this.schemaString);
          return Convert.ToBase64String(result);;
        }  

Just keep in mind that in this case schema is an array schema of T. To generate it you can use:

AvroConvert.GenerateSchema(typeof(List<T>));

And on the API side:

var deserialized = AvroConvert.DeserializeHeadless<List<T>>(result, schema);

from https://github.com/AdrianStrugala/AvroConvert

Adrian
  • 96
  • 5