2

In continuation to my previous question C# Confluent.Kafka SetValueDeserializer object deserialization, I have tried creating my custom deserializer to deserialize protobuf message but getting this error:

System.InvalidOperationException: 'Type is not expected, and no contract can be inferred: Ileco.Chimp.Proto.FinalValue'

on line:

return Serializer.Deserialize<T>(stream);

Here my consumer and deserializer:

    class Worker
    {
        public static void Consumer(string brokerList, string connStr, string consumergroup, string topic, string cacertlocation)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = brokerList,
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SocketTimeoutMs = 60000,                //this corresponds to the Consumer config `request.timeout.ms`
                SessionTimeoutMs = 30000,
                SaslMechanism = SaslMechanism.Plain,
                SaslUsername = "$ConnectionString",
                SaslPassword = connStr,
                SslCaLocation = cacertlocation,
                GroupId = consumergroup,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                BrokerVersionFallback = "1.0.0",        //Event Hubs for Kafka Ecosystems supports Kafka v1.0+, a fallback to an older API will fail
                //Debug = "security,broker,protocol"    //Uncomment for librdkafka debugging information
            };

            using (var consumer = new ConsumerBuilder<string, FinalValue>(config)
                .SetKeyDeserializer(Deserializers.Utf8)
                .SetValueDeserializer(new MyCustomDeserializer<FinalValue>())
                .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                .Build())
            {
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };

                consumer.Subscribe(topic);

                Console.WriteLine("Consuming messages from topic: " + topic + ", broker(s): " + brokerList);

                while (true)
                {
                    try
                    {
                        var msg = consumer.Consume(cts.Token);
                        Console.WriteLine($"Received: '{msg.Message.Value}'");

                        //var bytes = Encoding.ASCII.GetBytes(msg.Message.Value);
                        //var fv = FromByteArray<ProtobufMsg>(bytes);

                        //var proto = ProtoDeserialize<ProtobufMsg>(bytes);
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consume error: {e.Error.Reason}");
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine($"Error: {e.Message}");
                    }
                }
            }
        }


    }

    public class MyCustomDeserializer<T> : IDeserializer<T>
    {
        public T Deserialize(ReadOnlySpan<byte> data, bool isNull, Confluent.Kafka.SerializationContext context)
        {
            using (var stream = new MemoryStream(data.ToArray()))
            {
                return Serializer.Deserialize<T>(stream);
            }
        }
    }

FinalValue.proto

syntax = "proto3";

package ileco.chimp.proto;

import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";

option java_package = "ileco.chimp.proto";
option java_outer_classname = "FinalValueProtos";

message FinalValue {
  google.protobuf.Timestamp timestamp = 1;
  uint32 inputId = 2;
  google.protobuf.DoubleValue value = 3;
  uint32 sourceId = 4;
  string inputGuid = 5;
}

FinalValue.cs

// Generated by the protocol buffer compiler.  DO NOT EDIT!
// source: proto/FinalValue.proto
#pragma warning disable 1591, 0612, 3021
#region Designer generated code

using pb = global::Google.Protobuf;
using pbc = global::Google.Protobuf.Collections;
using pbr = global::Google.Protobuf.Reflection;
using scg = global::System.Collections.Generic;
namespace Ileco.Chimp.Proto {

  /// <summary>Holder for reflection information generated from proto/FinalValue.proto</summary>
  [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
  public static partial class FinalValueReflection {

    #region Descriptor
    /// <summary>File descriptor for proto/FinalValue.proto</summary>
    public static pbr::FileDescriptor Descriptor {
      get { return descriptor; }
    }
    private static pbr::FileDescriptor descriptor;

    static FinalValueReflection() {
      byte[] descriptorData = global::System.Convert.FromBase64String(
          string.Concat(
            "ChZwcm90by9GaW5hbFZhbHVlLnByb3RvEhFpbGVjby5jaGltcC5wcm90bxof",
            "Z29vZ2xlL3Byb3RvYnVmL3RpbWVzdGFtcC5wcm90bxoeZ29vZ2xlL3Byb3Rv",
            "YnVmL3dyYXBwZXJzLnByb3RvIp4BCgpGaW5hbFZhbHVlEi0KCXRpbWVzdGFt",
            "cBgBIAEoCzIaLmdvb2dsZS5wcm90b2J1Zi5UaW1lc3RhbXASDwoHaW5wdXRJ",
            "ZBgCIAEoDRIrCgV2YWx1ZRgDIAEoCzIcLmdvb2dsZS5wcm90b2J1Zi5Eb3Vi",
            "bGVWYWx1ZRIQCghzb3VyY2VJZBgEIAEoDRIRCglpbnB1dEd1aWQYBSABKAlC",
            "JQoRaWxlY28uY2hpbXAucHJvdG9CEEZpbmFsVmFsdWVQcm90b3NiBnByb3Rv",
            "Mw=="));
      descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
          new pbr::FileDescriptor[] { global::Google.Protobuf.WellKnownTypes.TimestampReflection.Descriptor, global::Google.Protobuf.WellKnownTypes.WrappersReflection.Descriptor, },
          new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] {
            new pbr::GeneratedClrTypeInfo(typeof(global::Ileco.Chimp.Proto.FinalValue), global::Ileco.Chimp.Proto.FinalValue.Parser, new[]{ "Timestamp", "InputId", "Value", "SourceId", "InputGuid" }, null, null, null)
          }));
    }
    #endregion

  }
  #region Messages
  [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
  public sealed partial class FinalValue : pb::IMessage<FinalValue> {
    private static readonly pb::MessageParser<FinalValue> _parser = new pb::MessageParser<FinalValue>(() => new FinalValue());
    public static pb::MessageParser<FinalValue> Parser { get { return _parser; } }

    public static pbr::MessageDescriptor Descriptor {
      get { return global::Ileco.Chimp.Proto.FinalValueReflection.Descriptor.MessageTypes[0]; }
    }

    pbr::MessageDescriptor pb::IMessage.Descriptor {
      get { return Descriptor; }
    }

    public FinalValue() {
      OnConstruction();
    }

    partial void OnConstruction();

    public FinalValue(FinalValue other) : this() {
      Timestamp = other.timestamp_ != null ? other.Timestamp.Clone() : null;
      inputId_ = other.inputId_;
      Value = other.Value;
      sourceId_ = other.sourceId_;
      inputGuid_ = other.inputGuid_;
    }

    public FinalValue Clone() {
      return new FinalValue(this);
    }

    /// <summary>Field number for the "timestamp" field.</summary>
    public const int TimestampFieldNumber = 1;
    private global::Google.Protobuf.WellKnownTypes.Timestamp timestamp_;
    public global::Google.Protobuf.WellKnownTypes.Timestamp Timestamp {
      get { return timestamp_; }
      set {
        timestamp_ = value;
      }
    }

    /// <summary>Field number for the "inputId" field.</summary>
    public const int InputIdFieldNumber = 2;
    private uint inputId_;
    public uint InputId {
      get { return inputId_; }
      set {
        inputId_ = value;
      }
    }

    /// <summary>Field number for the "value" field.</summary>
    public const int ValueFieldNumber = 3;
    private static readonly pb::FieldCodec<double?> _single_value_codec = pb::FieldCodec.ForStructWrapper<double>(26);
    private double? value_;
    public double? Value {
      get { return value_; }
      set {
        value_ = value;
      }
    }

    /// <summary>Field number for the "sourceId" field.</summary>
    public const int SourceIdFieldNumber = 4;
    private uint sourceId_;
    public uint SourceId {
      get { return sourceId_; }
      set {
        sourceId_ = value;
      }
    }

    /// <summary>Field number for the "inputGuid" field.</summary>
    public const int InputGuidFieldNumber = 5;
    private string inputGuid_ = "";
    public string InputGuid {
      get { return inputGuid_; }
      set {
        inputGuid_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
      }
    }

    public override bool Equals(object other) {
      return Equals(other as FinalValue);
    }

    public bool Equals(FinalValue other) {
      if (ReferenceEquals(other, null)) {
        return false;
      }
      if (ReferenceEquals(other, this)) {
        return true;
      }
      if (!object.Equals(Timestamp, other.Timestamp)) return false;
      if (InputId != other.InputId) return false;
      if (Value != other.Value) return false;
      if (SourceId != other.SourceId) return false;
      if (InputGuid != other.InputGuid) return false;
      return true;
    }

    public override int GetHashCode() {
      int hash = 1;
      if (timestamp_ != null) hash ^= Timestamp.GetHashCode();
      if (InputId != 0) hash ^= InputId.GetHashCode();
      if (value_ != null) hash ^= Value.GetHashCode();
      if (SourceId != 0) hash ^= SourceId.GetHashCode();
      if (InputGuid.Length != 0) hash ^= InputGuid.GetHashCode();
      return hash;
    }

    public override string ToString() {
      return pb::JsonFormatter.ToDiagnosticString(this);
    }

    public void WriteTo(pb::CodedOutputStream output) {
      if (timestamp_ != null) {
        output.WriteRawTag(10);
        output.WriteMessage(Timestamp);
      }
      if (InputId != 0) {
        output.WriteRawTag(16);
        output.WriteUInt32(InputId);
      }
      if (value_ != null) {
        _single_value_codec.WriteTagAndValue(output, Value);
      }
      if (SourceId != 0) {
        output.WriteRawTag(32);
        output.WriteUInt32(SourceId);
      }
      if (InputGuid.Length != 0) {
        output.WriteRawTag(42);
        output.WriteString(InputGuid);
      }
    }

    public int CalculateSize() {
      int size = 0;
      if (timestamp_ != null) {
        size += 1 + pb::CodedOutputStream.ComputeMessageSize(Timestamp);
      }
      if (InputId != 0) {
        size += 1 + pb::CodedOutputStream.ComputeUInt32Size(InputId);
      }
      if (value_ != null) {
        size += _single_value_codec.CalculateSizeWithTag(Value);
      }
      if (SourceId != 0) {
        size += 1 + pb::CodedOutputStream.ComputeUInt32Size(SourceId);
      }
      if (InputGuid.Length != 0) {
        size += 1 + pb::CodedOutputStream.ComputeStringSize(InputGuid);
      }
      return size;
    }

    public void MergeFrom(FinalValue other) {
      if (other == null) {
        return;
      }
      if (other.timestamp_ != null) {
        if (timestamp_ == null) {
          timestamp_ = new global::Google.Protobuf.WellKnownTypes.Timestamp();
        }
        Timestamp.MergeFrom(other.Timestamp);
      }
      if (other.InputId != 0) {
        InputId = other.InputId;
      }
      if (other.value_ != null) {
        if (value_ == null || other.Value != 0D) {
          Value = other.Value;
        }
      }
      if (other.SourceId != 0) {
        SourceId = other.SourceId;
      }
      if (other.InputGuid.Length != 0) {
        InputGuid = other.InputGuid;
      }
    }

    public void MergeFrom(pb::CodedInputStream input) {
      uint tag;
      while ((tag = input.ReadTag()) != 0) {
        switch(tag) {
          default:
            input.SkipLastField();
            break;
          case 10: {
            if (timestamp_ == null) {
              timestamp_ = new global::Google.Protobuf.WellKnownTypes.Timestamp();
            }
            input.ReadMessage(timestamp_);
            break;
          }
          case 16: {
            InputId = input.ReadUInt32();
            break;
          }
          case 26: {
            double? value = _single_value_codec.Read(input);
            if (value_ == null || value != 0D) {
              Value = value;
            }
            break;
          }
          case 32: {
            SourceId = input.ReadUInt32();
            break;
          }
          case 42: {
            InputGuid = input.ReadString();
            break;
          }
        }
      }
    }

  }

  #endregion

}

#endregion Designer generated code
Ali Shahzad
  • 5,163
  • 7
  • 36
  • 64

1 Answers1

2

As I noted yesterday, you appear to have used the Google .proto processing tools (protoc), but are using protobuf-net; if you want to use protobuf-net, similar command-line/IDE/build/etc tools exist that are compatible with the protobuf-net library, or you can use https://protogen.marcgravell.com/ for ad-hoc usage (to avoid having to install anything). Alternatively: continue using the Google schema tools, but use the Google library. Basically: they need to match.

The only minor gotcha here is that protobuf-net does not currently have explicit inbuilt support for DoubleValue; for reference: this can be considered as simply:

namespace Google.Protobuf.WellKnownTypes
{
    [ProtoContract]
    public sealed class DoubleValue
    {
        [ProtoMember(1)]
        public double Value {get;set;}
    }
}

I should probably find time to take all the types from wrappers.proto and allow them as double?, float?, long? etc - but it will need an additional marker, as Nullable<T> is already handled but with a different meaning (i.e. optional in .proto terms)

Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • Hi Marc, I tried generating class from your mentioned tool. Now, what about datetime datatype? I am getting this error: System.ArgumentOutOfRangeException: 'The added or subtracted value results in an un-representable DateTime. Parameter name: value'. The property for datetime timestamp in generated class is: [global::ProtoBuf.ProtoMember(1, Name = @"timestamp"/*, DataFormat = global::ProtoBuf.DataFormat.WellKnown*/)] public global::System.DateTime? Timestamp { get; set; } – Ali Shahzad Sep 08 '21 at 10:26
  • @AliShahzad well, why did you comment out the `DataFormat` part? that was needed – Marc Gravell Sep 08 '21 at 10:33
  • it gives compile time error, the enum has only these values public enum DataFormat { Default = 0, ZigZag = 1, TwosComplement = 2, FixedSize = 3, Group = 4 } – Ali Shahzad Sep 08 '21 at 11:24
  • Hi @Marc, I have upgraded to protobuf-net v3.0.0 and uncommented the ```DataFormat``` but now getting this exception while deserialize: ProtoBuf.ProtoException: 'Invalid wire-type (String); this usually means you have over-written a file without truncating or setting the length; see https://stackoverflow.com/q/2152978/23354'. What am I missing now? – Ali Shahzad Sep 08 '21 at 11:45
  • @AliShahzad do you have a base-64 dump of an inbound payload that I can check? (i.e.: `Convert.ToBase64String(yourBytes)`) – Marc Gravell Sep 08 '21 at 11:47
  • @AliShahzad FWIW: `DataFormat.WellKnown` was in v2 as well; it is *preferable* to use v3, but I'm curious - what version *were* you using? it must have been very far from any recent v2 or v3 – Marc Gravell Sep 08 '21 at 11:49
  • here is the protobuf message in string form which I am trying to deserialize: "\n\u0006\b��݉\u0006\u0010��\v\u001a\t\t\0\0\0\0\0\0\0@ f*$353776ed-588c-40f8-82e0-a67efe821b67" and this is the base-64 dump: "CgYIPz8/BhA/PwsaCQkAAAAAAAAAQCBmKiQzNTM3NzZlZC01ODhjLTQwZjgtODJlMC1hNjdlZmU4MjFiNjc=" – Ali Shahzad Sep 08 '21 at 11:54
  • according to my nuget manager, I was using v2.2.1 before where I hadn't the wellknown in enum. – Ali Shahzad Sep 08 '21 at 11:55
  • @AliShahzad ah, k; that's 5 years old - there are plenty more recent v2 builds, such as 2.4.6 - but: if you can use v3: that's great too; checking the base-64, sec... – Marc Gravell Sep 08 '21 at 11:59
  • FYI, here is the proto: syntax = "proto3"; package ileco.chimp.proto; import "google/protobuf/timestamp.proto"; import "google/protobuf/wrappers.proto"; option java_package = "ileco.chimp.proto"; option java_outer_classname = "FinalValueProtos"; message FinalValue { google.protobuf.Timestamp timestamp = 1; uint32 inputId = 2; google.protobuf.DoubleValue value = 3; uint32 sourceId = 4; string inputGuid = 5; } – Ali Shahzad Sep 08 '21 at 12:07
  • @AliShahzad on analysis of that payload: I think you've corrupted it, almost certainly by treating it as text at some point; the payload starts `0A-06` which means "field 1, length prefixed, 6 bytes" - so; let's just skip the next 6 bytes a moment (which is valid) - that gives our next field header as `3F` - and `3F` is **not** a valid protobuf header; that would be wire-type 7, and there **is no wire-type 7**; no protobuf decoder - not Google, not mine, nobodies - can decipher that until the payload arrives intact – Marc Gravell Sep 08 '21 at 12:09
  • where's the issue on which line? – Ali Shahzad Sep 08 '21 at 12:11
  • @AliShahzad as I told you: if we look at the bytes of that payload, they are *absolutely, without doubt, 100%* invalid protobuf data, at least from some of the `3F` onwards; now: `3F` happens to be unicode `?`, which is a common output if you run protobuf data through a text encoding, and it freaks out on some of the binary data, replacing parts with `?`; as I say: I expect you've treated the binary payload as text at some point, and irretrievably corrupted it – Marc Gravell Sep 08 '21 at 12:14
  • I used this piece of code for base-64: using (var consumer = new ConsumerBuilder(config).SetKeyDeserializer(Deserializers.Utf8).SetValueDeserializer(Deserializers.Utf8).Build()) { CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; consumer.Subscribe(topic); while (true) { try { var msg = consumer.Consume(cts.Token); var byt = Encoding.ASCII.GetBytes(msg.Message.Value); var b64 = Convert.ToBase64String(byt); – Ali Shahzad Sep 08 '21 at 12:16
  • @AliShahzad OK; and what is the producer/consumer API that you're using here, and what is `msg.Message`? basically, if `msg.Message.Value` is a `string` (and isn't hex/base-64): **you're already toast** - the problem is right there; you need to access the *bytes* of the message, not some text interpretation of the bytes, because: **it isn't text**. Is there perhaps a `msg.Message.ValueBytes` or something? without knowing what API you're using here, it is hard to be more specific – Marc Gravell Sep 08 '21 at 12:20
  • note: if you absolutely *must* pass protobuf over a text protocol, i.e. there is no binary API on your pub/sub processor: use base-64 *at both ends* - i.e. at the publisher, convert the protobuf bytes to a base-64 string, and publish *that*, then at the consumer: take the base-64 string and reverse the base-64 step to get the protobuf bytes and decode *that*. But if you *can* access the raw bytes, that is more efficient than base-64 – Marc Gravell Sep 08 '21 at 12:23
  • here is implementation with deserializer: using (var consumer = new ConsumerBuilder(config) .SetKeyDeserializer(Deserializers.Utf8) .SetValueDeserializer(new MyCustomDeserializer()) .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")) .Build()) { CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; consumer.Subscribe(topic); while (true) { try { var msg = consumer.Consume(cts.Token); Console.WriteLine($"Received: '{msg.Message.Value}'"); ... – Ali Shahzad Sep 08 '21 at 12:24
  • the message is in protobuf format, it can't be changed - I just need to consume it as decoded – Ali Shahzad Sep 08 '21 at 12:26
  • if you check deserializer implementation I am not converting it to string/text format, that was only to convert the string in base-64 – Ali Shahzad Sep 08 '21 at 12:27
  • @AliShahzad I understand that the message is in protobuf format, but: if `msg.Message.Value` is a string, then: that is the point where it has become corrupted; you *cannot* interpret protobuf binary as a `string`, and any attempt to do so *will* break the data; so: if your pub/sub API lacks some kind of `msg.Message.ValueBytes` API (or something similar) - then you have a big problem. Once again, if you tell me what pub/sub API you're using here, I might be able to advise further. To clarify: protocols, encodings, pub/sub etc: these are all things I know intimately; I'm not speculating here. – Marc Gravell Sep 08 '21 at 12:36
  • I am using Confluent.Kafka, Version=1.7.0.0 in my .Net app – Ali Shahzad Sep 08 '21 at 12:39
  • The code msg.Message.Value runs after the deserialization, but the code breaks while deserialization before reaching this line. – Ali Shahzad Sep 08 '21 at 12:41
  • Also can you please analyze this base-64 if its corrupted or not? CgYI3KzdiQYQidgLGgAgZiokNzk4M2Y0YjAtYjQ3My00NTY1LTkyMWUtMzI5NDk3YjAwMzQy – Ali Shahzad Sep 08 '21 at 12:41
  • @AliShahzad yes, that's valid: https://i.stack.imgur.com/ZAHkS.png, and it works fine: https://gist.github.com/mgravell/982a49250aa1b50a4706db3bbda40420 – Marc Gravell Sep 08 '21 at 13:26