I am working on spark streaming context which is getting data from kafka topic in avro serialization as below.
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"schema.registry.url" -> "http://localhost:8081",
"key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"group.id" -> "1"
)
Using Kafka utils i am creating Direct stream as below
val topics = Set("mysql-foobar")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String,String](
topics,
kafkaParams)
)
I am also writing the data to console as
stream.foreachRDD ( rdd => {
rdd.foreachPartition(iterator => {
while (iterator.hasNext) {
val next = iterator.next()
println(next.value())
}
})
})
Now i want to create data frame from these RDD's. Is it possible i had reviewed and tested many solution from stackoverflow but getting some issues with them. Stackoverflow Solution are this and this also. My output looks as below
{"c1": 4, "c2": "Jarry", "create_ts": 1536758512000, "update_ts": 1537204805000}