0

What are good ways to publish and consume case classes via Kafka using Scala (or Java)?

I found various partial examples including answers here, but no full examples.

I managed to get this to work, using custom Kafka serialization, as follows, but I'd like feedback on if there is a better way to do it:

Generic serializer class:

package mypackage

import java.nio.charset.Charset
import java.util

import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Serializer
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization

class ObjectSerializer[T <: AnyRef] extends Serializer[T] {

  override def configure(config: util.Map[String, _], isKey: Boolean): Unit = {
  }

  override def serialize(topic: String, data: T): Array[Byte] = {

    if (data == null) {
      return Array.empty[Byte]
    } else {
      try {
        implicit val formats = DefaultFormats
        val json = Serialization.write[T](data)
        val bytes = json.getBytes(Charset.forName("utf-8"))
        return bytes
      } catch {
        case ex: Exception =>
          throw new SerializationException(ex)
      }
    }
  }

  override def close(): Unit = {
  }
}

Generic deserializer base class:

package mypackage

import java.nio.charset.Charset
import java.util
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Deserializer
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization

class ObjectDeserializer[T <: AnyRef](tType: Class[T]) extends Deserializer[T] {

  private val actualType = tType

  override def configure(props: util.Map[String, _], isKey: Boolean): Unit = {
  }

  override def deserialize(s: String, bytes: Array[Byte]): T = {

    if (bytes == null || bytes.length == 0) {
      return null.asInstanceOf[T]
    } else {
      try {
        val json = new String(bytes, Charset.forName("utf-8"))
        implicit val formats = DefaultFormats
        implicit val mf = Manifest.classType[T](actualType)
        val data = Serialization.read[T](json)(formats, mf)
        return data
      } catch {
        case ex: Exception =>
          throw new SerializationException(ex)
      }
    }
  }

  override def close(): Unit = {
  }
}

Specific deserializer class:

package mypackage

import payoneer.labs.techExamples.dataModel.MyDataA

class MyDataADeserializer extends ObjectDeserializer[MyDataA](classOf[MyDataA]) {
}

Configuration:

ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ObjectSerializer[MyDataA]]

ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[MyDataADeserializer]

Then use KafkaProducer[String, MyDataA] and KafkaConsumer[String, MyDataA], assuming the key I want to use is a String and the value I want to use is a MyDataA.

Then I can publishing using:

producer.send(new ProducerRecord[String, MyDataA](topic, key, value))

and consume using:

while (true) {

    val messages = consumer.poll(1000).iterator().asScala

    for (msg <- messages) {
        val key : String = msg.key()
        val value : MyDataA = msg.value()
    }
}

Related questions:

Danny Varod
  • 17,324
  • 5
  • 69
  • 111
  • Avro4s has a GenericSerde which should work for Scala Case classes , but I have been unable to get that to work at runtime - it does compile. Most examples you find use the built in Serdes for String, Int or Long which is a bit frustrating ! https://github.com/sksamuel/avro4s – Chris W Nov 25 '20 at 09:58
  • @ChrisW this works pretty easily using a custom serializer/deserializer to/from JSON, at the time I was puzzled why such a basic pair of classes weren't included in the client. – Danny Varod Nov 25 '20 at 11:22

1 Answers1

0

From my Kafka experience i can sugest you to use Reactor-Kafka (https://github.com/reactor/reactor-kafka). It also have a pretty documentation (http://projectreactor.io/docs/kafka/snapshot/reference/)

dorintufar
  • 660
  • 9
  • 22