0

I am trying to build a POC with Kafka 0.10. I am using my own scala domain class as a Kafka message which has a bunch of String data types. I cannot use the default serializer class or the String serializer class that comes with Kafka library. I guess I need to write my own serializer and feed it to the producer properties. If you are aware of writing an example custom serializer in Kafka (using scala), please do share. Appreciate a lot, thanks much.

1 Answers1

0

You can write custom serializer easily in scala.You just need to extend class kafka.serializer.Encoder and override method toBytes and provide serialization logic in this method. Example code-

import com.google.gson.Gson
import kafka.utils.VerifiableProperties
class MessageSerializer(props : VerifiableProperties) extends 
kafka.serializer.Encoder[Message]{

  private val gson: Gson = new Gson()

  override def toBytes(t: Message): Array[Byte] = {
    val jsonStr = gson.toJson(t)
    jsonStr.getBytes
  }
}

In this code we are using google gson to serialize Message in json format, however you can use any other serialization framework.

Now you just need to provide this serializer in properties while instantiating producer i.e- props.put("serializer.class","codec.MessageSerializer");

Edit:- There is another way also to do it by extending Serializer directly. code-

import com.google.gson.Gson
import org.apache.kafka.common.serialization.Serializer
class MessageSerializer extends Serializer[Message]{
  private val gson: Gson = new Gson()
  override def configure(configs: util.Map[String, _], isKey: Boolean): 
  Unit = {
    // nothing to do
  }

  override def serialize(topic: String, data: Message): Array[Byte] = {
    if(data == null) 
      null
    else
      gson.toJson(data).getBytes
  }

  override def close(): Unit = {
    //nothing to do
  }
}
M Kumar
  • 566
  • 1
  • 4
  • 10
  • while trying to do that I got is not an instance of org.apache.kafka.common.serialization.Serializer – Drissi Yazami Apr 18 '17 at 13:33
  • thanks a lot but still got some errors:Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer Caused by: scala.NotImplementedError: an implementation is missing at scala.Predef$.$qmark$qmark$qmark(Predef.scala:230) at com.leanovia.streaming.MessageSerializer.configure(MessageSerializer.scala:14) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:233) I defined my producer as bellow: val kafkaProducer: Producer[Nothing,Activity]= new KafkaProducer[Nothing,Activity](props) where Activity is my entity – Drissi Yazami Apr 19 '17 at 12:32