0

I need to do a transformation on a selection of columns of a data frame which has a nested structure. The transformation relies on a function that is already present.

Suppose the data looks as follows

case class A(A: B)
case class B(B: String, C: String, D: Seq[C])
case class C(E: String, F: String)

val df = sc.parallelize(Seq(A(B("b", "c", Seq(C("e1","f1"), C("e2", "f2")))) )).toDF
df.printSchema

root
|-- A: struct (nullable = true)
|    |-- B: string (nullable = true)
|    |-- C: string (nullable = true)
|    |-- D: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- E: string (nullable = true)
|    |    |    |-- F: string (nullable = true)

and suppose the transformation is turning a string to its upper case

val upper: String => String = _.toUpperCase
val upperUDF = udf(upper)

Here I found an approach that partially solves my problem. Applying the code given there

def mutate(df: DataFrame, fn: Column => Column): DataFrame = {
   // Get a projection with fields mutated by `fn` and select it
   // out of the original frame with the schema reassigned to the original
   // frame (explained later)
   df.sqlContext.createDataFrame(df.select(traverse(df.schema, fn):_*).rdd, df.schema)
     }

def traverse(schema: StructType, fn: Column => Column, path: String = ""): Array[Column] = {
   schema.fields.map(f => {
   f.dataType match {
      case s: StructType => struct(traverse(s, fn, path + f.name + "."): _*)
      case _ => fn(col(path + f.name))
       }
     })
    }

the following works fine for me

val df2 = mutate(df, c => if (c.toString == "A.B" || c.toString == "A.C") upperUDF(c) else c)

However, when it comes to the transformation of the columns of the nested array D it fails without error.

val df3 = mutate(df, c => if (c.toString == "A.D.F") upperUDF(c) else c)

What is going wrong here? How can I transform columns of a nested array as described above?

K.O.T.
  • 111
  • 10
  • 1
    explode first. When you select the element of an array,then which element? Did you set? or possible to select? – Lamanus Aug 17 '20 at 22:28
  • I tried to explode, but it creates an additional column that I don't want. Unfortunately I don't see how to get such an exploded column back into my original schema. So you mean that `df.select("A.D.F")` returns an array column that needs to be told which elements shall be transformed (in my case all). – K.O.T. Aug 18 '20 at 11:30
  • Also `df.withColumn("NewCol", transform(col("A.D.F"),(col: Column) => upperUDF(col) ))` works but gives me an additional column while `mutate(df , c => if (c.toString == "A.D.F") transform( c, (col: Column) => upperUDF(col) ) else c)` does not deliver the expected output. – K.O.T. Aug 18 '20 at 12:14

0 Answers0