0

I am working on spark streaming context which is getting data from kafka topic in avro serialization as below.

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "schema.registry.url" -> "http://localhost:8081",
  "key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "group.id" -> "1"
)

Using Kafka utils i am creating Direct stream as below

val topics = Set("mysql-foobar")


val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String,String](
    topics,
    kafkaParams)
)

I am also writing the data to console as

stream.foreachRDD ( rdd => {
  rdd.foreachPartition(iterator => {
    while (iterator.hasNext) {
      val next = iterator.next()
      println(next.value())
    }
  })
})

Now i want to create data frame from these RDD's. Is it possible i had reviewed and tested many solution from stackoverflow but getting some issues with them. Stackoverflow Solution are this and this also. My output looks as below

{"c1": 4, "c2": "Jarry", "create_ts": 1536758512000, "update_ts": 1537204805000}
jarry jafery
  • 1,018
  • 1
  • 14
  • 25
  • Look at the Structured Streaming documentation. Don't use Dstream. https://github.com/AbsaOSS/ABRiS – OneCricketeer Sep 17 '18 at 12:58
  • what are the sbt library dependencies to import this za.co.absa.abris.avro.schemas.policy.SchemaRetentionPolicies? – jarry jafery Sep 17 '18 at 13:02
  • Should be mentioned on the github page https://github.com/AbsaOSS/ABRiS/blob/master/README.md#maven-dependency – OneCricketeer Sep 17 '18 at 13:14
  • i had tried this with structured streaming and following the suggested steps but i am getting error can you please review them that where i am doing wrong. – jarry jafery Sep 17 '18 at 13:22
  • val schemaRegistryConfs = Map( SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081", SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "mysql-foobar", SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest" ) val df = spark. readStream. format("kafka"). option("subscribe", "mysql-foobar"). option("kafka.bootstrap.servers", "localhost:9092"). fromAvro("value",schemaRegistryConfs)(RETAIN_SELECTED_COLUMN_ONLY) – jarry jafery Sep 17 '18 at 13:23
  • I've not used that library, but the schema registry url needs an HTTP prefix on it – OneCricketeer Sep 17 '18 at 13:23
  • ok now the code is running without error but getting nulls in values. – jarry jafery Sep 17 '18 at 13:26
  • 1
    I think you copied the wrong section https://github.com/AbsaOSS/ABRiS/blob/master/README.md#reading-avro-binary-records-from-confluent-platform-using-schema-registry-as-spark-structured-streams-and-performing-regular-sql-queries-on-them – OneCricketeer Sep 17 '18 at 13:27
  • Thanks I had searched for it but could not find that i think you should answer the question i will mark that as correct. – jarry jafery Sep 17 '18 at 13:31
  • but i think its much easier to implement with just 2 lines of code by including the required library. – jarry jafery Sep 17 '18 at 13:45

1 Answers1

1

Since you're using Confluent serializers, and they don't provide easy integrations with Spark at this time, you can checkout a relatively new library on Github by AbsaOSS that helps with this.

But basically, you use Spark Structured Streaming to get DataFrames, don't try to use Dstream to RDD to Dataframe...

You can find examples of what you're looking for here

Also see other examples at Integrating Spark Structured Streaming with the Kafka Schema Registry

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245