1

I am trying to use structured streaming in spark as it fits my use case well. However I cant seem to find a way to map the incoming data from Kafka into a case class. This is how far I could go based on official documentation.

import sparkSession.sqlContext.implicits._                          
val kafkaDF:DataFrame = sparkSession
                                          .readStream
                                          .format("kafka")
                                          .option("kafka.bootstrap.servers", bootstrapServers_CML)
                                          .option("subscribe", topics_ME)
                                          .option("startingOffsets", "latest")
                                          .load()
                                          .selectExpr("cast (value as string) as json") //Kakfa sends data in a specific schema (key, value, topic, offset, timestamp etc)    

val schema_ME = StructType(Seq(
    StructField("Parm1", StringType, true),
    StructField("Parm2", StringType, true),
    StructField("Parm3", TimestampType, true)))  

val mobEventDF:DataFrame = kafkaDF
                         .select(from_json($"json", schema_ME).as("mobEvent")) //Using a StructType to convert to application specific schema. Cant seem to use a case class for schema directly yet. Perhaps with later API??
                         .na.drop()

mobEventDF has a schema such as this

root
 |-- appEvent: struct (nullable = true)
 |    |-- Parm1: string (nullable = true)
 |    |-- Parm2: string (nullable = true)
 |    |-- Parm3: string (nullable = true)

Is there a better way to do this? How can I map this into a Scala Case class like the one below directly?

case class ME(name: String, 
                 factory: String,
                 delay: Timestamp)
zero323
  • 322,348
  • 103
  • 959
  • 935
user1384205
  • 1,231
  • 3
  • 20
  • 39

1 Answers1

2

Select and rename all fields and then call as method

kafkaDF.select($"mobEvent.*").toDF("name", "factory", "delay").as[ME]
  • Thanks. This definitely works. But is there a way to specify the case class directly? In the above method, I have to specify the same schema twice, a struct and then a case class. – user1384205 Jul 02 '18 at 12:42
  • You can use this: https://stackoverflow.com/questions/36746055/generate-a-spark-structtype-schema-from-a-case-class – ferdyh Jun 12 '19 at 12:55