EDIT
In newer Kafka Clients, implement Serializer
rather than Encoder
.
The things required for writing a custom serializer are:
- Implement
Encoder
with an object specified for the generic
- Supplying a
VerifiableProperties
constructor is required
- Override
toBytes(...)
method making sure a byte array is returned
- Inject the serializer class into
ProducerConfig
Declaring a custom serializer for a producer
As you noted in your question, Kafka supplies a means to declare a specific serializer for a producer. The serializer class is set in a ProducerConfig
instance and that instance is used to construct the desired Producer
class.
If you follow Kafka's Producer Example you will construct ProducerConfig
via a Properties
object. When building your properties file be sure to include:
props.put("serializer.class", "path.to.your.CustomSerializer");
With the path to the class you want Kafka to use to serialize messages before appending them to the log.
Creating a custom serializer that Kafka understands
Writing a custom serializer that Kafka can properly interpret requires implementing the Encoder[T]
scala class that Kafka provides. Implementing traits in java is weird, but the following method worked for serializing JSON in my project:
public class JsonEncoder implements Encoder<Object> {
private static final Logger logger = Logger.getLogger(JsonEncoder.class);
// instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
private static final ObjectMapper objectMapper = new ObjectMapper();
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
Your question makes it sound like you are using one object (lets call it CustomMessage
) for all messages appended to your log. If that's the case, your serializer could look more like this:
package com.project.serializer;
public class CustomMessageEncoder implements Encoder<CustomMessage> {
public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(CustomMessage customMessage) {
return customMessage.toBytes();
}
}
Which would leave your property config to look like this:
props.put("serializer.class", "path.to.your.CustomSerializer");