34

I'm working through a Databricks example. The schema for the dataframe looks like:

> parquetDF.printSchema
root
|-- department: struct (nullable = true)
|    |-- id: string (nullable = true)
|    |-- name: string (nullable = true)
|-- employees: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- firstName: string (nullable = true)
|    |    |-- lastName: string (nullable = true)
|    |    |-- email: string (nullable = true)
|    |    |-- salary: integer (nullable = true)

In the example, they show how to explode the employees column into 4 additional columns:

val explodeDF = parquetDF.explode($"employees") { 
case Row(employee: Seq[Row]) => employee.map{ employee =>
  val firstName = employee(0).asInstanceOf[String]
  val lastName = employee(1).asInstanceOf[String]
  val email = employee(2).asInstanceOf[String]
  val salary = employee(3).asInstanceOf[Int]
  Employee(firstName, lastName, email, salary)
 }
}.cache()
display(explodeDF)

How would I do something similar with the department column (i.e. add two additional columns to the dataframe called "id" and "name")? The methods aren't exactly the same, and I can only figure out how to create a brand new data frame using:

val explodeDF = parquetDF.select("department.id","department.name")
display(explodeDF)

If I try:

val explodeDF = parquetDF.explode($"department") { 
  case Row(dept: Seq[String]) => dept.map{dept => 
  val id = dept(0) 
  val name = dept(1)
  } 
}.cache()
display(explodeDF)

I get the warning and error:

<console>:38: warning: non-variable type argument String in type pattern Seq[String] is unchecked since it is eliminated by erasure
            case Row(dept: Seq[String]) => dept.map{dept => 
                           ^
<console>:37: error: inferred type arguments [Unit] do not conform to    method explode's type parameter bounds [A <: Product]
  val explodeDF = parquetDF.explode($"department") { 
                                   ^
pfnuesel
  • 14,093
  • 14
  • 58
  • 71
Feynman27
  • 3,049
  • 6
  • 30
  • 39

3 Answers3

49

In my opinion the most elegant solution is to star expand a Struct using a select operator as shown below:

var explodedDf2 = explodedDf.select("department.*","*")

https://docs.databricks.com/spark/latest/spark-sql/complex-types.html

DHARIN PAREKH
  • 600
  • 4
  • 6
25

You could use something like that:

var explodeDF = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDeptDF = explodeDeptDF.withColumn("name", explodeDeptDF("department.name"))

which you helped me into and these questions:

Community
  • 1
  • 1
gsamaras
  • 71,951
  • 46
  • 188
  • 305
  • A stage failure: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 41.0 failed 4 times, most recent failure: Lost task 0.3 in stage 41.0 (TID 1403, 10.81.214.49): scala.MatchError: [[789012,Mechanical Engineering]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) – Feynman27 Sep 01 '16 at 15:59
  • @Feynman27 does [this](http://stackoverflow.com/questions/25222989/erasure-elimination-in-scala-non-variable-type-argument-is-unchecked-since-it) help? It seems to match *your* attempt. I think the problem with my answer is that the `employees` has also an element, while `department` has not. – gsamaras Sep 01 '16 at 16:18
  • Yeah, the `employees` example creates new rows, whereas the `department` example should only create two new columns. – Feynman27 Sep 01 '16 at 16:36
  • Related question: http://stackoverflow.com/questions/30008127/how-to-read-a-nested-collection-in-spark – Tagar Dec 01 '16 at 05:45
  • Can we do this for all nested columns with renaming at once? For example, `department.id` -> `inner_id`, `department.name` -> `inner_name`, ... – Saddle Point Oct 25 '21 at 14:15
3

This seems to work (though maybe not the most elegant solution).

var explodeDF2 = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDF2 = explodeDF2.withColumn("name", explodeDF2("department.name"))
Feynman27
  • 3,049
  • 6
  • 30
  • 39
  • 3
    you could `val explodeDF2 = explodeDF.withColumn("id", explodeDF("department.id")).withColumn("name", explodeDF2("department.name"))` – Davos Jul 11 '17 at 17:02