30

I have a problem statement at hand wherein I want to unpivot table in Spark SQL / PySpark. I have gone through the documentation and I could see there is support only for pivot, but no support for un-pivot so far. Is there a way I can achieve this?

Let my initial table look like this:

Let my initial table look like this

When I pivot this in PySpark:

df.groupBy("A").pivot("B").sum("C")

I get this as the output:

After pivot table looks like this

Now I want to unpivot the pivoted table. In general, this operation may/may not yield the original table based on how I've pivoted the original table.

Spark SQL as of now doesn't provide out of the box support for unpivot. Is there a way I can achieve this?

ZygD
  • 22,092
  • 39
  • 79
  • 102
Manish Mehra
  • 1,381
  • 1
  • 16
  • 24
  • 1
    Why not just group the original table by `A,B`? Do you always want rows for X,Y and Z even if there is no value for them (e.g. there is no Z for G)? – David דודו Markovitz Feb 26 '17 at 07:38
  • See also [Transpose column to row with Spark](https://stackoverflow.com/q/37864222/9613318) and [How to melt Spark DataFrame?](https://stackoverflow.com/q/41670103/9613318) – Alper t. Turker May 11 '18 at 18:58

2 Answers2

60

You can use the built in stack function, for example in Scala:

scala> val df = Seq(("G",Some(4),2,None),("H",None,4,Some(5))).toDF("A","X","Y", "Z")
df: org.apache.spark.sql.DataFrame = [A: string, X: int ... 2 more fields]

scala> df.show
+---+----+---+----+
|  A|   X|  Y|   Z|
+---+----+---+----+
|  G|   4|  2|null|
|  H|null|  4|   5|
+---+----+---+----+


scala> df.select($"A", expr("stack(3, 'X', X, 'Y', Y, 'Z', Z) as (B, C)")).where("C is not null").show
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  G|  X|  4|
|  G|  Y|  2|
|  H|  Y|  4|
|  H|  Z|  5|
+---+---+---+

Or in pyspark:

In [1]: df = spark.createDataFrame([("G",4,2,None),("H",None,4,5)],list("AXYZ"))

In [2]: df.show()
+---+----+---+----+
|  A|   X|  Y|   Z|
+---+----+---+----+
|  G|   4|  2|null|
|  H|null|  4|   5|
+---+----+---+----+

In [3]: df.selectExpr("A", "stack(3, 'X', X, 'Y', Y, 'Z', Z) as (B, C)").where("C is not null").show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  G|  X|  4|
|  G|  Y|  2|
|  H|  Y|  4|
|  H|  Z|  5|
+---+---+---+
Andrew Ray
  • 736
  • 6
  • 4
  • 2
    I tried using the pyspark code given here, but its performance seems to be bad. Using a union all query to achieve pivot down gave me a better performance compared to this code. Are there any tweaks that we can do here to improve the performance ? – Afaq May 24 '17 at 10:58
  • 1
    awesome. could you please send a link to stack() documentation? can't find it. – Tagar Jul 06 '17 at 22:09
  • 3
    It's the same usage as in Hive: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-stack(values) – Andrew Ray Aug 04 '17 at 19:54
  • 1
    are you sure this is the result?writing 'as (B,C)' does not work since he expect 1 column to be the output. my result is a single column with all values i.e C column. B column is missing. what am I doing wrong? – David H Jan 01 '18 at 19:32
  • This is interesting - I have been wondering about a function to unpivot dataframes and it's the first time I see something clean. For the people interested by benchmarking this, don't forget the solution consisting in `explode(array({struct(,)}*))`. – Wilmerton May 16 '18 at 12:38
  • @AndrewRay: I am having some type mismatch **cannot resolve.. due to data type mismatch: Argument 2 (DoubleType) != Argument 6 (LongType); line 1 pos 0;** . Testing shows that it seem _stack_ imply the type of your C column based on the first few elements of X and Y. When Z comes in, type mismatch. How would you solve that ? I am trying _cast({X} as bigint)_ which works, but not sure if it's the correct way. – Kenny Apr 23 '19 at 19:43
  • Is there a simpler way to do this? What if your df was considerably larger so delineating values was excessively tedious and error prone? –  Oct 25 '19 at 16:42
  • I am a bit of a newbie with spark in general and had a table with multiple columns representing counts for different path types. Example: |publicationid|smtp_count|email_count|sms_count| I wanted to decompose this into multiple rows and only if the count was greater than 1 and using your example came up with: df.selectExpr("publicationid", "stack(3, 'email', email_count, 'smtp', smtp_count, 'sms', sms_count) as (type, count)").where("count > 0") WORKED PERFECTLY! Thank you / thank you! – Starlton Aug 20 '20 at 09:24
2

Spark 3.4+

df = df.melt(['A'], ['X', 'Y', 'Z'], 'B', 'C')
#  OR
df = df.unpivot(['A'], ['X', 'Y', 'Z'], 'B', 'C')
+---+---+----+
|  A|  B|   C|
+---+---+----+
|  G|  Y|   2|
|  G|  Z|null|
|  G|  X|   4|
|  H|  Y|   4|
|  H|  Z|   5|
|  H|  X|null|
+---+---+----+

To filter out nulls: df = df.filter("C is not null")


Spark 3.3 and below

to_melt = {'X', 'Y', 'Z'}
new_names = ['B', 'C']

melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt])
df = df.select(
    *(set(df.columns) - to_melt),
    F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})")
).filter(f"!{new_names[1]} is null")

Full test:

from pyspark.sql import functions as F
df = spark.createDataFrame([("G", 4, 2, None), ("H", None, 4, 5)], list("AXYZ"))

to_melt = {'X', 'Y', 'Z'}
new_names = ['B', 'C']

melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt])
df = df.select(
    *(set(df.columns) - to_melt),
    F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})")
).filter(f"!{new_names[1]} is null")

df.show()
# +---+---+---+
# |  A|  B|  C|
# +---+---+---+
# |  G|  Y|  2|
# |  G|  X|  4|
# |  H|  Y|  4|
# |  H|  Z|  5|
# +---+---+---+
ZygD
  • 22,092
  • 39
  • 79
  • 102