23

I am using my own class in a Kafka message which has a bunch of String data types.

I therefore cannot use the default serializer class or the StringSerializer that comes with Kafka library.

I guess I need to write my own serializer and feed it to the producer properties?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Paaji
  • 2,139
  • 4
  • 14
  • 11

3 Answers3

44

EDIT

In newer Kafka Clients, implement Serializer rather than Encoder.


The things required for writing a custom serializer are:

  1. Implement Encoder with an object specified for the generic
    • Supplying a VerifiableProperties constructor is required
  2. Override toBytes(...) method making sure a byte array is returned
  3. Inject the serializer class into ProducerConfig

Declaring a custom serializer for a producer

As you noted in your question, Kafka supplies a means to declare a specific serializer for a producer. The serializer class is set in a ProducerConfig instance and that instance is used to construct the desired Producer class.

If you follow Kafka's Producer Example you will construct ProducerConfig via a Properties object. When building your properties file be sure to include:

props.put("serializer.class", "path.to.your.CustomSerializer");

With the path to the class you want Kafka to use to serialize messages before appending them to the log.

Creating a custom serializer that Kafka understands

Writing a custom serializer that Kafka can properly interpret requires implementing the Encoder[T] scala class that Kafka provides. Implementing traits in java is weird, but the following method worked for serializing JSON in my project:

public class JsonEncoder implements Encoder<Object> {
    private static final Logger logger = Logger.getLogger(JsonEncoder.class);
    // instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public JsonEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(Object object) {
        try {
            return objectMapper.writeValueAsString(object).getBytes();
        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }
        return "".getBytes();
    }
}

Your question makes it sound like you are using one object (lets call it CustomMessage) for all messages appended to your log. If that's the case, your serializer could look more like this:

package com.project.serializer;
    
public class CustomMessageEncoder implements Encoder<CustomMessage> {
    public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(CustomMessage customMessage) {
        return customMessage.toBytes();
    }
}

Which would leave your property config to look like this:

props.put("serializer.class", "path.to.your.CustomSerializer");
Yassin Hajaj
  • 21,337
  • 9
  • 51
  • 89
Sam Berry
  • 7,394
  • 6
  • 40
  • 58
  • 3
    How do you deserialize the byte array in kafka consumer now that we've serialized the object? – Hongyi Li Aug 30 '14 at 01:16
  • 3
    One suggestion: since construction of `ObjectMapper` is a heavy-weight operation, and since they are thread-safe after construction, it makes sense to create a static final instance of it for encoder/decoder. Otherwise construction will take 10x longer than actual reading/writing. – StaxMan Jun 04 '15 at 20:59
  • 1
    @StaxMan good point. I'm sure for most applications the best approach would be to inject the `ObjectMapper`, but like you said, for this example showing a single instantiation is an improvement. – Sam Berry Jun 04 '15 at 21:11
  • 1
    @SamB. Yes, injecting a properly configured instance makes sense in many cases. Just wanted to mention this because performance effect is significant, and because code often gets cut-n-pasted verbatim – StaxMan Jun 04 '15 at 23:14
  • 1
    whether this method work with 0.8.2 version of kafka producer? – Siva Gnanam Mar 10 '16 at 13:28
  • I don't think you need this for 0.8.2 because Kafka 0.8.2 has its own Serializer and Deserializer interface. – user2441441 Jun 09 '16 at 06:20
14

You need to implement both encode and decoder

public class JsonEncoder implements Encoder<Object> {
        private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);

        public JsonEncoder(VerifiableProperties verifiableProperties) {
            /* This constructor must be present for successful compile. */
        }

        @Override
        public byte[] toBytes(Object object) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                return objectMapper.writeValueAsString(object).getBytes();
            } catch (JsonProcessingException e) {
                LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
            }
            return "".getBytes();
        }
    }

The decoder code

public class JsonDecoder  implements Decoder<Object> {
    private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
    public JsonDecoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public Object fromBytes(byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.readValue(bytes, Map.class);
        } catch (IOException e) {
            LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e);
        }
        return null;
    }
}

The pom entry

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.4.1.3</version>
</dependency>

Set the default encoder in the Kafka property

properties.put("serializer.class","kafka.serializer.DefaultEncoder");

The writer and reader code is as follows

byte[] bytes = encoder.toBytes(map);
        KeyedMessage<String, byte[]> message =new KeyedMessage<String, byte[]>(this.topic, bytes);

JsonDecoder decoder = new JsonDecoder(null);
Map map = (Map) decoder.fromBytes(it.next().message());
Harvinder Singh
  • 203
  • 4
  • 11
0

I therefore cannot use the default serializer class or the StringSerializer that comes with Kafka library.

Sure you can.

For example, use Jackson or Gson ; convert your instance into a JSON string or (preferrably) binary bytearray, then use one of the built-in Kafka serializers.

Other options

Recommended

Use the Confluent versions of Avro or Protobuf serializers along with the Schema Registry for your class.


You could also just use ByteArraySerializer if you write your class to a ObjectOutputStream (however, that is not recommended due to it not being cross-language supported).

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245