3

We are struggling to convert a Spark DataFrame to a DataSet of case class with dynamic columns. Starting points for each use case are DataFrames like the one below:

root
|-- id: string (nullable = true)
|-- time: long (nullable = true)
|-- c: struct (nullable = true)
|    |-- d: long (nullable = true)
|    |-- e: string (nullable = true)
|    |-- f: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- g: long (nullable = true)
|-- x: string (nullable = true)
|-- y: string (nullable = true)

All these DataFrames have id and time column in common. The other columns (c, x, y) are configurable and can be vary for each use case in count (in some DataFrames we have up to 10 columns beside id and time) and type.

To simplify the complex processing of the these events we would like to cast each row to a scala case class like this:

case class Event(id: String, time: Long, payload: Seq[Any])

or:

case class Event(id: String, time: Long, payload: Map[String, Any]) // payload: <column-name>/<value>

Sadly, df.as[Event] doesn't work out of the box. Any ideas, how to do the trick?

Writing a case class for every use case is not an option, because we just want to configure the job via a YAML file and don't want to adjust the code for every new use case. We need something more generic! (I thought about generating a case class at runtime, but this is quite complicated...)

Thanks for your help!

Update - Solution

We now came up with the following solution:

case class Event(id: String, time: Long, payload: Seq[Array[Byte]])

and convert the DataFrame with the help of kryo:

val idIndex = df.columns.indexOf("id")
val timeIndex = df.columns.indexOf("time")
val otherColumns = List.range(0, df.columns.length).filterNot(i => i == idIndex && i == timeIndex)

val kryo = new Kryo()

val ds = df.map(row => {
  Event(
    row.getAs[String](idIndex),
    row.getAs[Long](timeIndex),
    otherColumns.map(index => {
      val output = new Output(192, 8192)
      kryo.writeObject(output, row.get(index))
      output.getBuffer
    })
  )
})

ds.printSchema()

Thx, for your help.

Sebastian
  • 41
  • 5
  • 1
    A `Dataset` can't be created for a structure that contains `Any`, since `Any` doesn't have an Encoder (Spark has built-in encoders for all primitive types and Product types that store them efficiently in memory - but `Any` could be anything and therefore can't be Encoded), and whatever encoder you do create for it would probably make it useless... see more here: http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset – Tzach Zohar Apr 06 '17 at 21:33

0 Answers0