1

I'm trying to deserialize value to restore Kafka message value. But i can not find proper deserializer. I receive following value:

AAAAAAKWBQACDkdMIE5hbWUgc3QuIEhlcm9pdiBQcmFjaRpiLjI4LCBhcHAuMTUwDktoYXJraXYES0gKODU0MzQSMDk4NDMyMzIzAK61xYSfXgIKMTE6MzICDFRSMTIzMghLeWl2

But when i'm trying to get string representation of deserialized message by:

        var bytes = Convert.FromBase64String("AAAAAAKWBQACDkdMIE5hbWUgc3QuIEhlcm9pdiBQcmFjaRpiLjI4LCBhcHAuMTUwDktoYXJraXYES0gKODU0MzQSMDk4NDMyMzIzAK61xYSfXgIKMTE6MzICDFRSMTIzMghLeWl2");
        var jsonBack = Encoding.UTF8.GetString(bytes);

And it's only partially able to deserialize it. with extra symbols. I can't find any docs on how it serialized and can not see the structure of it. Does anyone seen the same while working with AWS Lambda with MSK trigger for .Net

Bellow i give an example of message from aws docs. But there's no word about what's kept in value (Key & value of message and what's the form)

MSK Event Payload

Tie Kapch
  • 31
  • 5

1 Answers1

4

I have created a class that I use for this. I ran into similar issue when I started working with this because AWS only has given the example of node JS. So from there I created the below class that works just fine. Try it. Use this class as the type of payload received to your lambda function. So your handler signature would look like below. Please note that you would need to base64decode the 'body' property in the class 'KafkaMessage' below.

public async Task FunctionHandler(MSKEvent evnt, ILambdaContext context)
{
//do some magic.
}

/// <summary>
/// Represents an MSK event. MSK event is the event when lambda is triggered via a kafka topic.
/// </summary>
public class MSKEvent
{
    /// <summary>
    /// The source of the event.
    /// </summary>
    public string EventSource { get; set; }

    /// <summary>
    /// The AWS arn of the event source.
    /// </summary>
    public string EventSourceArn { get; set; }

    /// <summary>
    /// The collection of records.
    /// </summary>
    public Dictionary<string, IEnumerable<KafkaMessage>> Records { get; set; }
}

/// <summary>
/// Represents a kafka message.
/// </summary>
public class KafkaMessage
{
    /// <summary>
    /// The kafka topic name this message belongs to.
    /// </summary>
    public string Topic { get; set; }

    /// <summary>
    /// The kafka partition this message belongs to.
    /// </summary>
    public int Partition { get; set; }

    /// <summary>
    /// The offset of this message.
    /// </summary>
    public int Offset { get; set; }

    /// <summary>
    /// The created timestamp in unix ms of this message.
    /// </summary>
    public long Timestamp { get; set; }

    /// <summary>
    /// The timestamp type.
    /// </summary>
    public string TimestampType { get; set; }

    /// <summary>
    /// Base64 encoded message body. Base64 decode this field to obtain the actual message.
    /// </summary>
    [JsonProperty("value")]
    public string Body { get; set; }
}