0

This is my first time using Kafka and I am planning to use kafka with .net

I wanted to know if I can send JSON as a message when I producing an event

I am following the tutorial: https://developer.confluent.io/get-started/dotnet/#build-producer

Also, is there a way for that value to be mapped to a model so that value/json structure is always tied to that model

So for example: if I want my json value to be

{
  "customerName":"anything",
  "eventType":"one-of-three-enums",
  "columnsChanged": "string value or something"
}

Most of the examples that I can find are like this:

using Confluent.Kafka;
using System;
using Microsoft.Extensions.Configuration;

class Producer {
    static void Main(string[] args)
    {
        if (args.Length != 1) {
            Console.WriteLine("Please provide the configuration file path as a command line argument");
        }

        IConfiguration configuration = new ConfigurationBuilder()
            .AddIniFile(args[0])
            .Build();

        const string topic = "purchases";

        string[] users = { "eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther" };
        string[] items = { "book", "alarm clock", "t-shirts", "gift card", "batteries" };

        using (var producer = new ProducerBuilder<string, string>(
            configuration.AsEnumerable()).Build())
        {
            var numProduced = 0;
            const int numMessages = 10;
            for (int i = 0; i < numMessages; ++i)
            {
                Random rnd = new Random();
                var user = users[rnd.Next(users.Length)];
                var item = items[rnd.Next(items.Length)];

                producer.Produce(topic, new Message<string, string> { Key = user, Value = item },
                    (deliveryReport) =>
                    {
                        if (deliveryReport.Error.Code != ErrorCode.NoError) {
                            Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                        }
                        else {
                            Console.WriteLine($"Produced event to topic {topic}: key = {user,-10} value = {item}");
                            numProduced += 1;
                        }
                    });
            }

            producer.Flush(TimeSpan.FromSeconds(10));
            Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
        }
    }
}

I would like the item to be a class in json structure.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Learn AspNet
  • 1,192
  • 3
  • 34
  • 74
  • 1
    You simply would replace `var item` with a serialized JSON object as a string... You should be able to do this without knowledge of Kafka. Also, did you find the example code here? https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/JsonSerialization/Program.cs – OneCricketeer Nov 23 '21 at 22:05
  • @OneCricketeer Is it possible to reject the event if it is not same json structure. I would like it to be strongly typed? – Learn AspNet Nov 23 '21 at 22:10
  • Out of the box, no. That's what the Schema Registry is for; to store JSONSchema, which the message must match. Alternatively, you can use Protobuf or another strongly typed binary format – OneCricketeer Nov 23 '21 at 22:12
  • @OneCricketeer Just a follow up question. Do you know if I can limit the message size and is there a way for producer to check what is the size of the event before sending? – Learn AspNet Nov 23 '21 at 23:50

1 Answers1

0

wanted to know if I can send JSON as a message when I producing an event

Yes. Kafka stores bytes and converts the bytes using Serializers. When building a Producer, you have the option of calling SetValueSerializer.

Some of the built-in serializers can be found at - https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Serializers.cs

You'd need to write your own to generically handle any JSON model types.

When using the Utf8Serializer for strings, you'll need to pre-serialize the object from your model class, then send that as the value. In your example, you'd replace var item with some serialized object.

How do I turn a C# object into a JSON string in .NET?

When using model classes, your data will typically be strongly-typed until you start manually writing the JSON or use Dictionary types. If you wanted external message validation, the Confluent Schema Registry is one example that supports JSONSchema and the JsonSerializer from confluent-dotnet-kafka project supports this.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Just a follow up question. Do you know if I can limit the message size and is there a way for producer to check what is the size of the event before sending and do not allow to send the message if the size is more than the limit? – Learn AspNet Nov 23 '21 at 23:51
  • 1
    Kafka has a default limit of 1MB message batches. If you get the size of the serialized byte array, that should be a close approximation of an individual record size (there's additional overhead such as record headers and timestamps, though) – OneCricketeer Nov 24 '21 at 15:10
  • Thank you. Can you please answer: https://stackoverflow.com/questions/70097676/sql-server-data-to-kafka-in-real-time – Learn AspNet Nov 24 '21 at 15:11