0

I need to get data from Kafka topic as a Zio Stream, data there is in the google protobuf format, also i need to check schema

I use the following sample protobuf file which generates proto.Data Java class for me:

syntax = "proto3";
package proto;

import "google/protobuf/timestamp.proto";

option java_multiple_files = true;
option java_outer_classname = "Protos";

message Data {
  string id = 1;
  google.protobuf.Timestamp receiveTimestamp = 2;
}

If i use the following properties i am able to get data as KStream[proto.Data] (so using kafka api) for the proto.Data proto Message class

  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, s"kstream-application-${java.util.UUID.randomUUID().toString}")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    p.put("security.protocol", "SSL")
    p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
    p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde")
    p.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
    p.put("enable.auto.commit", "false")
    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    p.put("specific.protobuf.value.type", classOf[proto.Data])
    p
  }

And here is the example of code using the KStream (I am able to print record with exact Id equals 1 only):

  val builder: StreamsBuilder = new StreamsBuilder
  val risks: KStream[String, proto.Data] =
    builder
      .stream[String, proto.Data](topic)
      .filter((_, value) => value.getId=="1")

  val sysout = Printed
    .toSysOut[String, proto.Data]
    .withLabel("protoStream")
  risks.print(sysout)
  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()

  sys.ShutdownHookThread {
    streams.close(Duration.ofSeconds(10))
  }

Now if i use zio kafka and same properties somehow i am able to print out the whole stream:

  val props: Map[String, AnyRef] = Map(
    StreamsConfig.APPLICATION_ID_CONFIG -> s"kstream-application-${java.util.UUID.randomUUID().toString}",
    StreamsConfig.BOOTSTRAP_SERVERS_CONFIG ->  "localhost:9092",
    "security.protocol" -> "SSL",
    StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG ->  Serdes.String.getClass.getName,
    StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG ->  "io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde",
    AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->  "http://localhost:8081",
    "enable.auto.commit" ->  "false",
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG ->  "earliest",
    "specific.protobuf.value.type" ->  classOf[proto.Data]
  )

  val myStream = for {

    serdeProto <- Serde.fromKafkaSerde(new KafkaProtobufSerde[proto.Data](), props, true)
    _ <- stream
      .plainStream(Serde.string, serdeProto)
      .provideSomeLayer(consumer ++ Console.live)
      .tap(r => console.putStrLn(s"stream: $r"))
      .runDrain
  } yield ()


  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
    myStream.exitCode
  }

But if i try to filter only record with Id equals 1

  val myStream = for {

    serdeProto <- Serde.fromKafkaSerde(new KafkaProtobufSerde[proto.Data](), props, true)
    _ <- stream
      .plainStream(Serde.string, serdeProto)
      .provideSomeLayer(consumer ++ Console.live)
      .filter(_.value.getId=="1")
      .tap(r => console.putStrLn(s"stream: $r"))
      .runDrain
  } yield ()

I get error like

Fiber failed.
An unchecked error was produced.
java.lang.ClassCastException: com.google.protobuf.DynamicMessage cannot be cast to proto.Data

I was wondering if anybody used zio kafka together with google protobuf and deserialization to the Java proto class was successful when you read data from the topic?

eprst2019
  • 31
  • 3
  • I cant tell what is the difference you are trying to show. Properties or Map objects should not make a difference – OneCricketeer Jul 15 '22 at 18:12
  • I am saying that with these properties or map the kafka api works - so i am able to get the KStream of that proto type, but when i use the zio kafka api it doesnt work, so question is if maybe it is a zio kafka issue or perhaps i am not using it correctly? zio kafka doesnt have examples or documentation on how to use it with google protobuf data – eprst2019 Jul 15 '22 at 18:56
  • Can you try Zio with `Array[Byte]` (or `Serde.Bytes` builtin Streams class)? If still not working, then Protobuf isn't the problem – OneCricketeer Jul 15 '22 at 22:46
  • 1) If you get a network disconnection exception then that is unrelated to your serialization. 2) If you can use Bytes, then you can pre-serialize your protobuf data into bytes externally from Zio. – OneCricketeer Jul 18 '22 at 15:23
  • Sorry kafka was down, so the actual error i get is java.lang.ClassCastException: com.google.protobuf.DynamicMessage cannot be cast to proto.Data and it happens if i try to use some get methods from data.Proto class and if i use it just to print out it can print it out as DynamicMessage - so i guess Deserialization is not working somehow? although same set of properties makes the Deserialization work for Kafka Api streams – eprst2019 Jul 20 '22 at 16:50
  • Where are classes trying to be cast? Zio doesn't have a matching config for `specific.protobuf.value.type`, I assume? Can you edit the question with the full stacktrace, please? – OneCricketeer Jul 20 '22 at 17:05

0 Answers0