6

I am new to kafka and even serialization. until now I was required to handle kafka events of json format serialized using simple code. but now extra events are being added using Avro encoder. so now I want this single consumer to use StringDeserialzer if in json and for Avro its respective deserializer. but how can i map 2 Deserializer in the same properties file?

private Properties getProps(){
    Properties props = new Properties();
    props.put("group.id", env.getProperty("group.id"));
    props.put("enable.auto.commit", env.getProperty("enable.auto.commit"));
    props.put("key.deserializer", env.getProperty("key.deserializer"));
    props.put("value.deserializer", env.getProperty("value.deserializer"));
    return props;
}//here as only value can be mapped to "key.deserializer" is there anyway to do this

in the main method

KafkaConsumer<String, String> _consumer = new KafkaConsumer<>(getProps());
consumers.add(_consumer);
_consumer.subscribe(new ArrayList<>(topicConsumer.keySet()));
Jay.Doc
  • 61
  • 1
  • 3
  • You can set serializers and deserializers in code. See this answer http://stackoverflow.com/questions/40154086/how-to-create-custom-serializer-in-kafka/40158971#40158971 – Luciano Afranllie Oct 24 '16 at 16:12

2 Answers2

8

Just write a generic Deserializer which delegates the topics to the matching deserializer.

public class GenericDeserializer extends JsonDeserializer<Object>
{
    public GenericDeserializer()
    {
    }

    @Override
    public Object deserialize(String topic, Headers headers, byte[] data)
    {
        switch (topic)
        {
        case KafkaTopics.TOPIC_ONE:
            TopicOneDeserializer topicOneDeserializer = new TopicOneDeserializer();
            topicOneDeserializer.addTrustedPackages("com.xyz");
            return topicOneDeserializer.deserialize(topic, headers, data);
        case KafkaTopics.TOPIC_TWO:
            TopicTwoDeserializer topicTwoDeserializer= new TopicTwoDeserializer();
            topicTwoDeserializer.addTrustedPackages("com.xyz");
            return topicTwoDeserializer.deserialize(topic, headers, data);
        }
        return super.deserialize(topic, data);
    }
}
Slohrsh
  • 595
  • 1
  • 5
  • 9
3

You need to provide a single hybrid deserializer that is wraps both original deserializer. Internally, the new wrapping deserializer must be able to distinguish between both types of messages and forward the raw bytes to the correct deserializer that does the actual work.

If you cannot know in advance what type of message you have, you can also do a trial an error approach -- ie, hand it to one serializer by default, if this on fails (ie throws an exception) try the second one.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • i am new to deserialization and Avro deserialization is proving to be difficult as this needs pojo and returns the object, but StringDeserializer returns String which i turn it into object using Gson methods in the consumer. How to create a common method to use try and catch as their return types are different? – Jay.Doc Oct 25 '16 at 12:23
  • 1
    You will also need a hybrid return type; basically a wrapper class with a member for each type you do wrap. And each time you create a message of you hybrid type all but one member will be `null`. How to write an serializer is shown in the link in the comment to you question. – Matthias J. Sax Oct 25 '16 at 19:53