I'm using Spark 2.1's Structured Streaming to read from a Kafka topic whose contents are binary avro-encoded.
Thus, after setting up the DataFrame
:
val messages = spark
.readStream
.format("kafka")
.options(kafkaConf)
.option("subscribe", config.getString("kafka.topic"))
.load()
If I print the schema of this DataFrame
(messages.printSchema()
), I get the following:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: long (nullable = true)
|-- timestampType: integer (nullable = true)
This question should be orthogonal to the problem of avro-decoding, but let's assume I want to somehow convert the value
content from the messages DataFrame
into a Dataset[BusinessObject]
, by a function Array[Byte] => BusinessObject
. For example completeness, the function may just be (using avro4s):
case class BusinessObject(userId: String, eventId: String)
def fromAvro(bytes: Array[Byte]): BusinessObject =
AvroInputStream.binary[BusinessObject](
new ByteArrayInputStream(bytes)
).iterator.next
Of course, as miguno says in this related question I cannot just apply the transformation with a DataFrame.map()
, because I need to provide an implicit Encoder for such a BusinessObject
.
That can be defined as:
implicit val myEncoder : Encoder[BusinessObject] = org.apache.spark.sql.Encoders.kryo[BusinessObject]
Now, perform the map:
val transformedMessages : Dataset[BusinessObjecŧ] = messages.map(row => fromAvro(row.getAs[Array[Byte]]("value")))
But if I query the new schema, I get the following:
root
|-- value: binary (nullable = true)
And I think that does not make any sense, as the dataset should use the Product properties of the BusinessObject
case-class and get the correct values.
I've seen some examples on Spark SQL using .schema(StructType)
in the reader, but I cannot do that, not just because I'm using readStream
, but because I actually have to transform the column before being able to operate in such fields.
I am hoping to tell the Spark SQL engine that the transformedMessages
Dataset schema is a StructField
with the case class' fields.