9

There is only few serializer available like,

org.apache.kafka.common.serialization.StringSerializer

How can we create our own custom serializer ?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Vimal Dhaduk
  • 994
  • 2
  • 18
  • 43

4 Answers4

17

Here you have an example to use your own serializer/deserializer for the Kafka message value. For Kafka message key is the same thing.

We want to send a serialized version of MyMessage as Kafka value and deserialize it again into a MyMessage object at consumer side.

Serializing MyMessage in producer side.

You should create a serializer class that implements org.apache.kafka.common.serialization.Serializer

serialize() method do the work, receiving your object and returning a serialized version as bytes array.

public class MyValueSerializer implements Serializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public byte[] serialize(String topic, MyMessage message)
    {
        if (message == null) {
            return null;
        }

        try {

            (serialize your MyMessage object into bytes)

            return bytes;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error serializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);

int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));

Deserializing MyMessage in consumer side.

You should create a deserializer class that implements org.apache.kafka.common.serialization.Deserializer

deserialize() method do the work, receiving serialized value as bytes array and returning your object.

public class MyValueDeserializer implements Deserializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public MyMessage deserialize(String s, byte[] value)
    {
        if (value == null) {
            return null;
        }

        try {

            (deserialize value into your MyMessage object)

            MyMessage message = new MyMessage();
            return message;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error deserializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

Then use it like this:

final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);

ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {

    int kafkaKey = record.key();
    MyMessage kafkaValue = record.value();

    ...
}
Luciano Afranllie
  • 4,053
  • 25
  • 23
  • final KafkaConsumer consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer); – Vimal Dhaduk Oct 21 '16 at 06:04
  • Above mentioned it is not as per syntax, Then How can kafka knows about deserializer – Vimal Dhaduk Oct 21 '16 at 06:05
  • 2
    Deserializer is the third argument of the constructor: myValueDeserializer. All of this code was taken from working code, just changed some names. – Luciano Afranllie Oct 21 '16 at 15:11
  • Why do you save "isKey" in configure()? Can you explain when configure() and close() should not be empty methods? – Christopher Helck Aug 09 '19 at 15:30
  • @user1879313 For this code, there is no reason, but the Confluent Serializers, for example, use the boolean field to do different logic on the Schema Registry client and subsequently close the HTTP client in the close() method. – OneCricketeer Jul 23 '21 at 16:28
13

No words, only code

  1. Some object, which you sent to Kafka

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.ToString;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public class TestDto {
    
        private String name;
        private String version;
    
    }
    
  2. Create Serializer, which will be used by Producer

    @Slf4j
    public class KafkaValueSerializer implements Serializer<TestDto> {
    
        private ObjectMapper objectMapper = new ObjectMapper();
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }
    
        @Override
        public byte[] serialize(String topic, TestDto data) {
            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (JsonProcessingException e) {
                log.error("Unable to serialize object {}", data, e);
                return null;
            }
        }
    
        @Override
        public void close() {
        }
    }
    
  3. Of couser, don't foget about Deserialiser for Consumer

    @Slf4j
    public class KafkaValueDeserializer implements Deserializer<TestDto> {
    
        private ObjectMapper objectMapper = new ObjectMapper();
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }
    
        @Override
        public TestDto deserialize(String topic, byte[] data) {
            try {
                return objectMapper.readValue(new String(data, "UTF-8"), TestDto.class);
            } catch (Exception e) {
                log.error("Unable to deserialize message {}", data, e);
                return null;
            }
        }
    
        @Override
        public void close() {
        }
    }
    
  4. Last moment, add serializer/deserializer to application.yml

    spring:
        kafka:
          bootstrap-servers:  192.168.192.168:9092
          producer:
              value-serializer: com.package.service.kafka.KafkaValueSerializer
          consumer:
              group-id: groupId
              value-deserializer: com.package.service.kafka.KafkaValueDeserializer
    

That's all. It's not necessary any configuration file or dancing with a tamboirine :)

  1. Send

    KafkaTemplate<String, TestDto> kafkaTemplate;
    
    TestDto test = new TestDto("test name", "test-version");
    kafkaTemplate.send(topic, testDto);
    
  2. Listen

    @KafkaListener(topics = "${ktp-agent.kafka.request-topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void listen(TestDto message) {
    
        log.info("Received message '{}' from Kafka.", message.toString());
    }
    
Vladimir
  • 525
  • 5
  • 15
0

there is an easier way to do it, basically if you are casting your custom class to bytes in your custom serializer, then you are rebuilding the wheel. kafka already works with bytes.

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Bytes;

Bytes bytes = new Bytes(objectMapper.writeValueAsBytes(<customClass>));
kafkaTemplate.send("topic",bytes);

next in your Producter and Consumer configuration

@Bean
public ProducerFactory<String,String>(){
Map<String, Object> configProps = new HashMap<>();
    configProps.put(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "<kafka-server>");
    configProps.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
    configProps.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            BytesSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}


@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "host");
    props.put(
            ConsumerConfig.GROUP_ID_CONFIG,
            "group-id");
    props.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            BytesDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

finally

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Bytes;


@KafkaListener(topics = "your.topic")
public void getInfoPersona(Bytes message) throws IOException {
    <your-custom-class> customClass = 
    objectMapper.readValue(message.get(), <your-custom-class>.class);
}
hcallejas
  • 96
  • 1
  • 2
-2

You must create your own serializer which implements the interface Serializer (org.apache.kafka.common.serialization.Serializer) and then set the producer option key.serializer / value.serializer to it.

ThisaruG
  • 3,222
  • 7
  • 38
  • 60
amethystic
  • 6,821
  • 23
  • 25