1

I have a set of columns in my input data on which I am pivoting based on multiple columns.

I am facing issues with the column headers after the pivoting is done.

Input data

enter image description here

Output Generated by my approach -

enter image description here

Expected Output Headers:

I need the headers of the output to look like -

enter image description here

Steps done so far to achieve the Output I am getting -

// *Load the data*

scala> val input_data =spark.read.option("header","true").option("inferschema","true").option("delimiter","\t").csv("s3://mybucket/data.tsv")

// *Filter the data where residentFlag column = T*

scala> val filtered_data = input_data.select("numericID","age","salary","gender","residentFlag").filter($"residentFlag".contains("T"))

// *Now we will the pivot the filtered data by each column*

scala> val pivotByAge = filtered_data.groupBy("age","numericID").pivot("age").agg(expr("coalesce(first(numericID),'-')")).drop("age")

// *Pivot the data by the second column named "salary"*

scala> val pivotBySalary = filtered_data.groupBy("salary","numericID").pivot("salary").agg(expr("coalesce(first(numericID),'-')")).drop("salary")

// *Join the above two dataframes based on the numericID*

scala> val intermediateDf = pivotByAge.join(pivotBySalary,"numericID")

// *Now pivot the filtered data on Step 2 on the third column named Gender*

scala> val pivotByGender = filtered_data.groupBy("gender","numericID").pivot("gender").agg(expr("coalesce(first(numericID),'-')")).drop("gender")

// *Join the above dataframe with the intermediateDf*

scala> val outputDF= pivotByGender.join(intermediateDf ,"numericID")

How to rename the columns generated after pivoting?

Is there a different approach I can take for Pivoting the data set based on multiple columns (nearly 300)?

Any optimizations/suggestions for improving performance?

James Z
  • 12,209
  • 10
  • 24
  • 44
CodeReaper
  • 377
  • 4
  • 18
  • Is there a reason this is labeled with pyspark when you're using scala? – bendl Mar 14 '18 at 17:51
  • Its because maybe someone might have come across a similar problem while using pySpark. It is spark problem, not a language specific issue. Also, there is second part to the problem regarding optimisation so the question becomes generic across all spark execution environments. – CodeReaper Mar 14 '18 at 18:06
  • Okay, have you tried `df.withColumnRenamed`? – bendl Mar 14 '18 at 18:11
  • As you can see currently the final output has about 10 columns, so withColumnRenamed would work here. However, it will not work for two reasons 1. I dont want an additional step of renaming the columns manually by looking at the headers generated 2. In actual the input file will have 300 columns, on that pivoting will happen, so it will not be feasible to use withColumnRenamed as I won't know the headers in advance. Looking for a way in which I can use the input column name to be pivoted and then somehow append it in the headers generated from that column – CodeReaper Mar 14 '18 at 18:13
  • [Related](https://stackoverflow.com/questions/33015635/spark-dataframe-and-renaming-multiple-columns-java) – bendl Mar 14 '18 at 18:30

2 Answers2

2

You can consider using foldLeft to traverse the list of to-pivot columns to successively create pivot dataframe, rename the generated pivot columns, followed by the cumulative join:

val data = Seq(
  (1, 30, 50000, "M"),
  (1, 25, 70000, "F"),
  (1, 40, 70000, "M"),
  (1, 30, 80000, "M"),
  (2, 30, 80000, "M"),
  (2, 40, 50000, "F"),
  (2, 25, 70000, "F")
).toDF("numericID", "age", "salary", "gender")

// Create list pivotCols which consists columns to pivot
val id = data.columns.head
val pivotCols = data.columns.filter(_ != "numericID")

// Create the first pivot dataframe from the first column in list pivotCols and
// rename each of the generated pivot columns
val c1 = pivotCols.head
val df1 = data.groupBy(c1, id).pivot(c1).agg(expr(s"coalesce(first($id),'-')")).drop(c1)
val df1Renamed = df1.columns.tail.foldLeft( df1 )( (acc, x) =>
      acc.withColumnRenamed(x, c1 + "_" + x)
    )

// Using the first pivot dataframe as the initial dataframe, process each of the
// remaining columns in list pivotCols similar to how the first column is processed,
// and cumulatively join each of them with the previously joined dataframe
pivotCols.tail.foldLeft( df1Renamed )(
  (accDF, c) => {
    val df = data.groupBy(c, id).pivot(c).agg(expr(s"coalesce(first($id),'-')")).drop(c)
    val dfRenamed = df.columns.tail.foldLeft( df )( (acc, x) =>
      acc.withColumnRenamed(x, c + "_" + x)
    )
    dfRenamed.join(accDF, Seq(id))
  }
)

// +---------+--------+--------+------------+------------+------------+------+------+------+
// |numericID|gender_F|gender_M|salary_50000|salary_70000|salary_80000|age_25|age_30|age_40|
// +---------+--------+--------+------------+------------+------------+------+------+------+
// |2        |2       |-       |2           |-           |-           |-     |2     |-     |
// |2        |2       |-       |2           |-           |-           |2     |-     |-     |
// |2        |2       |-       |2           |-           |-           |-     |-     |2     |
// |2        |2       |-       |-           |2           |-           |-     |2     |-     |
// |2        |2       |-       |-           |2           |-           |2     |-     |-     |
// |2        |2       |-       |-           |2           |-           |-     |-     |2     |
// |2        |2       |-       |-           |-           |2           |-     |2     |-     |
// |2        |2       |-       |-           |-           |2           |2     |-     |-     |
// |2        |2       |-       |-           |-           |2           |-     |-     |2     |
// |2        |-       |2       |2           |-           |-           |-     |2     |-     |
// |2        |-       |2       |2           |-           |-           |2     |-     |-     |
// |2        |-       |2       |2           |-           |-           |-     |-     |2     |
// |2        |-       |2       |-           |2           |-           |-     |2     |-     |
// |2        |-       |2       |-           |2           |-           |2     |-     |-     |
// |2        |-       |2       |-           |2           |-           |-     |-     |2     |
// |2        |-       |2       |-           |-           |2           |-     |2     |-     |
// |2        |-       |2       |-           |-           |2           |2     |-     |-     |
// |2        |-       |2       |-           |-           |2           |-     |-     |2     |
// |1        |-       |1       |-           |1           |-           |1     |-     |-     |
// |1        |-       |1       |-           |1           |-           |-     |-     |1     |
// ...
Leo C
  • 22,006
  • 3
  • 26
  • 39
  • Can you help in understanding what exactly are you doing here - pivotCols.tail.foldLeft( df1Renamed )( (accDF, c) => { val df = data.groupBy(c, id).pivot(c).agg(expr(s"coalesce(first($id),'-')")).drop(c) val dfRenamed = df.columns.tail.foldLeft( df )( (acc, x) => acc.withColumnRenamed(x, c + "_" + x) ) dfRenamed.join(accDF, Seq(id)) } ) Will this work for any number of columns? ( In the actual scenario I have about 300 columns) What happens if in this scenario itself we had 2 more columns ( for instance, Country and City)? – CodeReaper Mar 15 '18 at 09:24
  • Please see comments in the updated answer. The same code will handle any number of columns assembled in list `pivotCols` as long as the groupBy/pivot/agg structure remains the same. Keep in mind though cumulative join of pivot dataframes would grow exponentially in size. – Leo C Mar 15 '18 at 16:23
1

You could do something like this and use regex to simplify

var outputDF= pivotByGender.join(intermediateDf ,"numericID")

val cols: Array[String] = outputDF.columns

cols
  .foreach{
    cl => cl match {
        case "F" => outputDF = outputDF.withColumnRenamed(cl,s"gender_${cl}")
        case "M" => outputDF = outputDF.withColumnRenamed(cl,s"gender_${cl}")
        case cl.matches("""\\d{2}""") => outputDF = outputDF.withColumnRenamed(cl,s"age_${cl}")

      }
  }
Angel F O
  • 76
  • 6
  • In this approach, I would need to write all the cases for all the possible outcomes from each pivoted column. – CodeReaper Mar 15 '18 at 09:29