2

In my consumer, I want to deserialize Kafka protobuf message. The key is of string type but message value is a protobuf object. I know I have to create my own custom deserializer for message value but no idea how can I create one. Here is my consumer implementation where I need to replace the marked line:

using Confluent.Kafka;
using System;
using System.Threading;

namespace EventHubsForKafkaSample
{
    class Worker1
    {
        public static void Consumer(string brokerList, string connStr, string consumergroup, string topic, string cacertlocation)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = brokerList,
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SocketTimeoutMs = 60000,                //this corresponds to the Consumer config `request.timeout.ms`
                SessionTimeoutMs = 30000,
                SaslMechanism = SaslMechanism.Plain,
                SaslUsername = "$ConnectionString",
                SaslPassword = connStr,
                SslCaLocation = cacertlocation,
                GroupId = consumergroup,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                BrokerVersionFallback = "1.0.0",        //Event Hubs for Kafka Ecosystems supports Kafka v1.0+, a fallback to an older API will fail
                //Debug = "security,broker,protocol"    //Uncomment for librdkafka debugging information
            };

            using (var consumer = new ConsumerBuilder<string, ProtobufMessage>(config)
                .SetKeyDeserializer(Deserializers.Utf8)
                .SetValueDeserializer(Deserializers.Utf8) //<<-----
                .Build())
            {
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };

                consumer.Subscribe(topic);

                Console.WriteLine("Consuming messages from topic: " + topic + ", broker(s): " + brokerList);

                while (true)
                {
                    try
                    {
                        var msg = consumer.Consume(cts.Token);
                        Console.WriteLine($"Received: '{msg.Value}'");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consume error: {e.Error.Reason}");
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine($"Error: {e.Message}");
                    }
                }
            }
        }
    }

    public class ProtobufMessage
    {
        public DateTime timestamp { get; set; }
        public int inputId { get; set; }
        public double? value { get; set; }
        public int sourceId { get; set; }
        public string inputGuid { get; set; }
    }
}

Protobuf message format:

syntax = "proto3";

package ileco.chimp.proto;

import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";

option java_package = "ileco.chimp.proto";
option java_outer_classname = "FinalValueProtos";

message FinalValue {
  google.protobuf.Timestamp timestamp = 1;
  uint32 inputId = 2;
  google.protobuf.DoubleValue value = 3;
  uint32 sourceId = 4;
  string inputGuid = 5;
}
Ali Shahzad
  • 5,163
  • 7
  • 36
  • 64
  • Note that the schema definition needs to be correct; see https://protogen.marcgravell.com/ to use protobuf-net's .proto to generate C# – Marc Gravell Sep 07 '21 at 16:04

3 Answers3

2
  1. You need to use protoc to generate your C# class from the schema

  2. You don't need your own deserializer if you are using the Schema Registry. See example code

     using (var consumer =
         new ConsumerBuilder<string, YourProtoMessage>(consumerConfig)
             .SetValueDeserializer(new ProtobufDeserializer<YourProtoMessage>().AsSyncOverAsync())
    

If you aren't using the Schema Registry, then you will need to define your own deserializer, by implementing IDeserializer, as mentioned in the other answer

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • I tried the same but getting this error now: Expecting message Value with Confluent Schema Registry framing. Magic byte was 10, expecting 0 – Ali Shahzad Sep 08 '21 at 06:53
  • If the producer did not using the Schema Registry, then you cannot use this way. You'll have to find that code/process before you can ever write a functional consumer deserializer – OneCricketeer Sep 08 '21 at 12:43
  • Calling `AsSyncOverAsync()` was the solution for me. – paolo Dec 20 '21 at 14:23
0

You need a class that implements the IDeserializer<T> interface as defined in the Kafka API documentation. Then, your marked line will be something like:

.SetValueDeserializer(new MyCustomDeserializer())

  • what should be in the implementation of this custom deserializer? – Ali Shahzad Sep 07 '21 at 14:59
  • Assuming you have the `Protobuf-net` Nuget package installed, your implementation of Deserialize would be something like: `public T Deserialize(ReadOnlySpan data, bool isNull, SerializationContext context) { using (var stream = new MemoryStream(data)) { return Serializer.Deserialize(stream); } }` – Shane Rimmer Sep 07 '21 at 15:17
  • When I tried this, I am getting this error: Type is not expected, and no contract can be inferred: GrpcGreeter.FinalValue on this line: return Serializer.Deserialize(stream); – Ali Shahzad Sep 08 '21 at 06:56
  • @Ali I'd recommend that you write unit tests for your deserializer outside of the context of Kafka data since it seems like the data you are getting doesn't match the schema you've defined – OneCricketeer Sep 08 '21 at 12:47
0

Sample using Protobuf.net custom Serializer

using Confluent.Kafka;
using System.IO;
using SerializationContext = Confluent.Kafka.SerializationContext;

namespace Common;

/// <summary>
/// Kafka protobuf serializer.
/// </summary>
/// <typeparam name="T">Type to serialize. </typeparam>
public class KafkaProtobufSerializer<T> : ISerializer<T> where T : class
{
    /// <inheritdoc/>
    public byte[] Serialize(T data, SerializationContext context)
    {
        using var ms = new MemoryStream();
        ProtoBuf.Serializer.Serialize<T>(ms, data);
        return ms.ToArray();
    }
}
/// <summary>
/// Protobuf deserializer.
/// </summary>
/// <typeparam name="T">Type to deserialize.</typeparam>
public class KafkaProtobufDeserializer<T> : IDeserializer<T> where T : class, new()
{
    /// <inheritdoc/>
    public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        if (isNull || data.IsEmpty)
        {
            return new T();
        }
        
        return ProtoBuf.Serializer.Deserialize<T>(data);
    }
}

Usage Example for the consumer

        var config = new ConsumerConfig
        {
            BootstrapServers = bootstrapServers,
            GroupId = groupId,
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnableAutoCommit = true,
            EnableAutoOffsetStore = false,
        };

        var consumer = new ConsumerBuilder<TKey, TValue>(config)
            .SetValueDeserializer(new KafkaProtobufDeserializer<TValue>())
            .SetErrorHandler((_, e) => workerLogger.LogError("Consumer Builder:{reason}", e.Reason))
            .Build();
Haroon
  • 1,052
  • 13
  • 28