1

I've made this piece of code :

case class RawPanda(id: Long, zip: String, pt: String, happy: Boolean, attributes: Array[Double])
case class PandaPlace(name: String, pandas: Array[RawPanda])

object TestSparkDataFrame extends App{

  System.setProperty("hadoop.home.dir", "E:\\Programmation\\Libraries\\hadoop")
  val conf = new SparkConf().setAppName("TestSparkDataFrame").set("spark.driver.memory","4g").setMaster("local[*]")
  val session = SparkSession.builder().config(conf).getOrCreate()

  import session.implicits._

  def createAndPrintSchemaRawPanda(session:SparkSession):DataFrame = {
    val newPanda = RawPanda(1,"M1B 5K7", "giant", true, Array(0.1, 0.1))
    val pandaPlace = PandaPlace("torronto", Array(newPanda))
    val df =session.createDataFrame(Seq(pandaPlace))
    df
  }
  val df2 = createAndPrintSchemaRawPanda(session)
  df2.show

+--------+--------------------+
|    name|              pandas|
+--------+--------------------+
|torronto|[[1,M1B 5K7,giant...|
+--------+--------------------+


  val pandaInfo = df2.explode(df2("pandas")) {
    case Row(pandas: Seq[Row]) =>
      pandas.map{
        case (Row(
          id: Long,
          zip: String,
          pt: String,
          happy: Boolean,
          attrs: Seq[Double])) => RawPanda(id, zip, pt , happy,      attrs.toArray)
      }
  }

  pandaInfo2.show

+--------+--------------------+---+-------+-----+-----+----------+
|    name|              pandas| id|    zip|   pt|happy|attributes|
+--------+--------------------+---+-------+-----+-----+----------+
|torronto|[[1,M1B 5K7,giant...|  1|M1B 5K7|giant| true|[0.1, 0.1]|
+--------+--------------------+---+-------+-----+-----+----------+

The problem that the explode function as I used it is deprecated, so I would like to recaculate the pandaInfo2 dataframe but using the adviced method in the warning.

use flatMap() or select() with functions.explode() instead

But then when I do :

 val pandaInfo = df2.select(functions.explode(df("pandas"))

I obtain the same result as I had in df2. I don't know how to proceed to use flatMap or functions.explode.

How could I use flatMap or functions.explode to obtain the result that I want ?(the one in pandaInfo)

I've seen this post and this other one but none of them helped me.

Omegaspard
  • 1,828
  • 2
  • 24
  • 52

1 Answers1

5

Calling select with explode function returns a DataFrame where the Array pandas is "broken up" into individual records; Then, if you want to "flatten" the structure of the resulting single "RawPanda" per record, you can select the individual columns using a dot-separated "route":

val pandaInfo2 = df2.select($"name", explode($"pandas") as "pandas")
  .select($"name", $"pandas",
    $"pandas.id" as "id",
    $"pandas.zip" as "zip",
    $"pandas.pt" as "pt",
    $"pandas.happy" as "happy",
    $"pandas.attributes" as "attributes"
  )

A less verbose version of the exact same operation would be:

import org.apache.spark.sql.Encoders // going to use this to "encode" case class into schema
val pandaColumns = Encoders.product[RawPanda].schema.fields.map(_.name)

val pandaInfo3 = df2.select($"name", explode($"pandas") as "pandas")
  .select(Seq($"name", $"pandas") ++ pandaColumns.map(f => $"pandas.$f" as f): _*)
Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • Wow nice, what is an Encoder and how did you know you had to use it ? – Omegaspard Oct 12 '17 at 18:06
  • 1
    [Encoder](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Encoder) is Spark's way of using reflection to figure out the DataFrame _schema_ appropriate to represent a given _class_. There are other ways to infer a schema from a class, see https://stackoverflow.com/q/36746055/5344058 for more options. – Tzach Zohar Oct 12 '17 at 18:14