3

I am having problems transposing values in a DataFrame in Scala. My initial DataFramelooks like this:

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   A|   X|   6|null|
|   B|   Z|null|   5|
|   C|   Y|   4|null|
+----+----+----+----+

col1 and col2 are type String and col3 and col4 are Int.

And the result should look like this:

+----+----+----+----+------+------+------+
|col1|col2|col3|col4|AXcol3|BZcol4|CYcol4|
+----+----+----+----+------+------+------+
|   A|   X|   6|null|     6|  null|  null|
|   B|   Z|null|   5|  null|     5|  null|
|   C|   Y|   4|   4|  null|  null|     4|
+----+----+----+----+------+------+------+

That means that the three new columns should be named after col1, col2 and the column the value is extracted. The extracted value comes from the column col2, col3 or col5 depending which value is not null.

So how to achieve that? I first thought of a UDF like this:

def myFunc (col1:String, col2:String, col3:Long, col4:Long) : (newColumn:String, rowValue:Long) = {
    if col3 == null{
        val rowValue=col4;
        val newColumn=col1+col2+"col4";
    } else{
        val rowValue=col3;
        val newColumn=col1+col2+"col3";
     }
    return (newColumn, rowValue);
}

val udfMyFunc = udf(myFunc _ ) //needed to treat it as partially applied function

But how can I call it from the dataframe in the right way?

Of course, all code above is rubbish and there could be a much better way. Since I am just juggling the first code snippets let me know... Comparing the Int value to null is already not working.

Any help is appreciated! Thanks!

Ken Jiiii
  • 474
  • 9
  • 21
  • Possible duplicate of [Apache Spark -- Assign the result of UDF to multiple dataframe columns](http://stackoverflow.com/questions/35322764/apache-spark-assign-the-result-of-udf-to-multiple-dataframe-columns) – jwvh Mar 08 '17 at 06:06

2 Answers2

3

There is a simpler way:

val df3 = df2.withColumn("newCol", concat($"col1", $"col2")) //Step 1
          .withColumn("value",when($"col3".isNotNull, $"col3").otherwise($"col4")) //Step 2
          .groupBy($"col1",$"col2",$"col3",$"col4",$"newCol") //Step 3
          .pivot("newCol") // Step 4
          .agg(max($"value")) // Step 5
          .orderBy($"newCol") // Step 6
          .drop($"newCol") // Step 7

      df3.show()

The steps work as follows:

  1. Add a new column which contains the contents of col1 concatenated with col2
  2. // add a new column, "value" which contains the non-null contents of either col3 or col4
  3. GroupBy the columns you want
  4. pivot on newCol, which contains the values which are now to be column headings
  5. Aggregate by the max of value, which will be the value itself if the groupBy is single-valued per group; or alternatively .agg(first($"value")) if value happens to be a string rather than a numeric type - max function can only be applied to a numeric type
  6. order by newCol so DF is in ascending order
  7. drop this column as you no longer need it, or skip this step if you want a column of values without nulls

Credit due to @user8371915 who helped me answer my own pivot question in the first place.

Result is as follows:

+----+----+----+----+----+----+----+
|col1|col2|col3|col4|  AX|  BZ|  CY|
+----+----+----+----+----+----+----+
|   A|   X|   6|null|   6|null|null|
|   B|   Z|null|   5|null|   5|null|
|   C|   Y|   4|   4|null|null|   4|
+----+----+----+----+----+----+----+

You might have to play around with the column header strings concatenation to get the right result.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
LucieCBurgess
  • 759
  • 5
  • 12
  • 26
1

Okay, I have a workaround to achieve what I want. I do the following:

(1) I generate a new column containing a tuple with [newColumnName,rowValue] following this advice Derive multiple columns from a single column in a Spark DataFrame

case class toTuple(newColumnName: String, rowValue: String)

def createTuple (input1:String, input2:String) : toTuple = {
    //do something fancy here
    var column:String= input1 + input2
    var value:String= input1        
    return toTuple(column, value)
}

val UdfCreateTuple = udf(createTuple _)

(2) Apply function to DataFrame

dfNew= df.select($"*", UdfCreateTuple($"col1",$"col2").alias("tmpCol")

(3) Create array with distinct values of newColumnName

val dfDistinct = dfNew.select($"tmpCol.newColumnName").distinct

(4) Create an array with distinct values

var a = dfDistinct.select($"newCol").rdd.map(r => r(0).asInstanceOf[String])

var arrDistinct = a.map(a => a).collect()

(5) Create a key value mapping

var seqMapping:Seq[(String,String)]=Seq()
for (i <- arrDistinct){
    seqMapping :+= (i,i)
}

(6) Apply mapping to original dataframe, cf. Mapping a value into a specific column based on annother column

val exprsDistinct = seqMapping.map { case (key, target) => 
  when($"tmpCol.newColumnName" === key, $"tmpCol.rowValue").alias(target) }

val dfFinal = dfNew.select($"*" +: exprsDistinct: _*)

Well, that is a bit cumbersome but I can derive a set of new columns without knowing how many there are and at the same time transfer the value into that new column.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Ken Jiiii
  • 474
  • 9
  • 21