6

I am fetching data from Kafka and then deserialize the Array[Byte] using default decoder, and after that my RDD elements looks like (null,[B@406fa9b2), (null,[B@21a9fe0) but I want my original data which have a schema, so how can I achieve this?

I serialize messages in Avro format.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
JSR29
  • 354
  • 1
  • 5
  • 17

2 Answers2

5

You have to decode the bytes using proper deserializers, say to strings or your custom object.

If you don't do the decoding you get [B@406fa9b2 that is simply the text representation of byte arrays in Java.

Kafka knows nothing about the content of your messages and so it passes byte arrays from producers to consumers.

In Spark Streaming you have to use serializers for keys and values (quoting KafkaWordCount example):

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")

With the above serializers you get DStream[String] so you work with RDD[String].

If however you want to deserialize byte arrays to your custom class directly you'd have to write a custom Serializer (which is Kafka-specific and has nothing to do with Spark).

What I'd recommend is to use JSON with a fixed schema or Avro (with a solution described in Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages).


In Structured Streaming however the pipeline could look as follows:

val fromKafka = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  select('value cast "string") // <-- conversion here
zero323
  • 322,348
  • 103
  • 959
  • 935
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • Then, how to convert Avro Kafka Message with/without schema registry into original object in Spark Structured Streaming? – Casel Chen Jan 14 '18 at 08:57
  • You have to know the original object back and use `map` operator for example. No `from_avro` yet (if ever) as we have for JSON with `from_json`. – Jacek Laskowski Jan 14 '18 at 09:41
  • I used KafkaAvroDeserializer to map Array[Byte] to my Avro Object, but it said "Unable to find encoder for type stored in a Dataset". Then I provide encoders as implicit def toEncoded(o: Zhima): Array[Byte] = o.toByteBuffer.array() implicit def fromEncoded(e: Array[Byte]): Zhima = valueDeserializer.deserialize(kafkaConsumeTopicName, e).asInstanceOf[Zhima] But it compained the same error, how to resolve it then? – Casel Chen Jan 17 '18 at 04:06
  • Then I used custom UDF to parse avro message, now it reported "Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type org.apache.avro.generic.GenericRecord is not supported". Should I need convert Avro Object to scala case class or java pojo? spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) => MyDeserializerWrapper.deser.deserialize(topic, bytes).asInstanceOf[GenericRecord] ) – Casel Chen Jan 17 '18 at 04:30
0

Jacek has provided great answer here

Below answer is the extension of his answer.

If you are using spark's structured streaming for consuming the data then you can do something like this:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._


// let us say your spark session is defined with 'spark'

val readDF = spark.readStream
        .format("kafka")
        .option("subscribe", "topic1")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("groupIdPrefix","whatever-group-id")
        .option("startingOffsets","latest")
        .load
        .select(col("value").cast(StringType))

You can write whatever the data type you want at the producer side, I took String for example purpose.

nomadSK25
  • 2,350
  • 3
  • 25
  • 36