65

Is there an equivalent of Pandas Melt function in Apache Spark in PySpark or at least in Scala?

I was running a sample dataset till now in Python and now I want to use Spark for the entire dataset.

ZygD
  • 22,092
  • 39
  • 79
  • 102
  • 2
    See also [unpivot in spark-sql/pyspark](https://stackoverflow.com/q/42465568/9613318) and [Transpose column to row with Spark](https://stackoverflow.com/q/37864222/9613318) – Alper t. Turker May 11 '18 at 18:56

6 Answers6

116

Spark >= 3.4

In Spark 3.4 or later you can use built-in melt method

(sdf
    .melt(
        ids=['A'], values=['B', 'C'], 
        variableColumnName="variable", 
        valueColumnName="value")
    .show())
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

This method is available across all APIs so could be used in Scala

sdf.melt(Array($"A"), Array($"B", $"C"), "variable", "value")

or SQL

SELECT * FROM sdf UNPIVOT (val FOR col in (col_1, col_2))

Spark 3.2 (Python only, requires Pandas and pyarrow)

(sdf
    .to_koalas()
    .melt(id_vars=['A'], value_vars=['B', 'C'])
    .to_spark()
    .show())
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

Spark < 3.2

There is no built-in function (if you work with SQL and Hive support enabled you can use stack function, but it is not exposed in Spark and has no native implementation) but it is trivial to roll your own. Required imports:

from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable 

Example implementation:

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

And some tests (based on Pandas doctests):

import pandas as pd

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6}})

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])
   A variable  value
0  a        B      1
1  b        B      3
2  c        B      5
3  a        C      2
4  b        C      4
5  c        C      6
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

Note: For use with legacy Python versions remove type annotations.

Related:

10465355
  • 4,481
  • 2
  • 20
  • 44
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    Your code adds back ticks to the column names and then it fails on `withColumn` call. More ref available here(https://stackoverflow.com/questions/55781796/how-to-remove-automatically-added-back-ticks-while-using-explode-in-pyspark) – Aviral Srivastava Apr 21 '19 at 11:49
  • 1
    How is this benchmarked in comparison to the `stack` option? as in: `df.selectExpr('col1', 'stack(2, "col2", col2, "col3", col3) as (cols, values)')` – Ran Feldesh Jul 24 '19 at 17:00
  • 7
    This isn't a trivial answer. This is a genius one! – BICube Feb 06 '20 at 18:39
  • Amazing answer. I've used this function many times without any problem. – kindofhungry Dec 02 '21 at 19:51
  • This is truly amazing. `explode` works with columns containing lists but creating the array *"_vars_and_vals"* as a key-value pair array of structure and then using it in a withColumn statement within explode is a very interesting behavior. What @BICube said! – hussam Jan 27 '22 at 21:26
  • Spark 3.4 isn't release yet (https://is.spark.released.info/) as of writing this comment. Is there any official announcement that you can point me to? – Adarsh Trivedi Feb 16 '23 at 22:16
35

Came across this question in my search for an implementation of melt in Spark for Scala.

Posting my Scala port in case someone also stumbles upon this.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/** Extends the [[org.apache.spark.sql.DataFrame]] class
 *
 *  @param df the data frame to melt
 */
implicit class DataFrameFunctions(df: DataFrame) {

    /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
     * 
     *  melt is (kind of) the inverse of pivot
     *  melt is currently (02/2017) not implemented in spark
     *
     *  @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
     *  @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark
     *  
     *  @todo method overloading for simple calling
     *
     *  @param id_vars the columns to preserve
     *  @param value_vars the columns to melt
     *  @param var_name the name for the column holding the melted columns names
     *  @param value_name the name for the column holding the values of the melted columns
     *
     */

    def melt(
            id_vars: Seq[String], value_vars: Seq[String], 
            var_name: String = "variable", value_name: String = "value") : DataFrame = {

        // Create array<struct<variable: str, value: ...>>
        val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)

        // Add to the DataFrame and explode
        val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

        val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}

        return _tmp.select(cols: _*)

    }
}

Since I'm am not that advanced considering Scala, I'm sure there is room for improvement.

Any comments are welcome.

0m3r
  • 12,286
  • 15
  • 35
  • 71
Ahue
  • 817
  • 8
  • 13
  • 1
    Your code is okay but I would advice replace `for-yield` constructions just to `map` functions, for-example: `{ for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}` is the same as `List(var_name, value_name).map(x => col("_vars_and_vals")(x).alias(x))` and `for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }` can be written so: `value_vars.map(c => struct(lit(c).alias(var_name), col(c).alias(value_name)))`. for-yield is more general thing in **scala** than for-comprehension in **python**. – Boris Azanov Nov 29 '20 at 10:55
8

Voted for user6910411's answer. It works as expected, however, it cannot handle None values well. thus I refactored his melt function to the following:

from pyspark.sql.functions import array, col, explode, lit
from pyspark.sql.functions import create_map
from pyspark.sql import DataFrame
from typing import Iterable 
from itertools import chain

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create map<key: value>
    _vars_and_vals = create_map(
        list(chain.from_iterable([
            [lit(c), col(c)] for c in value_vars]
        ))
    )

    _tmp = df.select(*id_vars, explode(_vars_and_vals)) \
        .withColumnRenamed('key', var_name) \
        .withColumnRenamed('value', value_name)

    return _tmp

Test is with the following dataframe:

import pandas as pd

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6},
                   'D': {1: 7, 2: 9}})

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C', 'D'])

A   variable    value
0   a   B   1.0
1   b   B   3.0
2   c   B   5.0
3   a   C   2.0
4   b   C   4.0
5   c   C   6.0
6   a   D   NaN
7   b   D   7.0
8   c   D   9.0

sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C', 'D']).show()
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|  1.0|
|  a|       C|  2.0|
|  a|       D|  NaN|
|  b|       B|  3.0|
|  b|       C|  4.0|
|  b|       D|  7.0|
|  c|       B|  5.0|
|  c|       C|  6.0|
|  c|       D|  9.0|
+---+--------+-----+
Wei Li
  • 81
  • 1
  • 1
  • How would this work if I am trying to pass in a list i.e 'someColumns' for the value_vars? I'm getting an 'Unsupported literal type class' error. – Budyn Aug 09 '19 at 13:51
  • It works for me perfectly, nice job! @Budyn: what exactly are you passing into the value_vars parameter? I pass a list of strings (of column names), like this: `df_long = melt(df_wide, id_vars=['id', 'date'], value_vars=['t1', 't2', 't3', 't4'])` – Melkor.cz Aug 22 '19 at 11:20
4

UPD

Finally i've found most effective implementation for me. It uses all resources for cluster in my yarn configuration.

from pyspark.sql.functions import explode
def melt(df):
    sp = df.columns[1:]
    return (df
            .rdd
            .map(lambda x: [str(x[0]), [(str(i[0]), 
                                         float(i[1] if i[1] else 0)) for i in zip(sp, x[1:])]], 
                 preservesPartitioning = True)
            .toDF()
            .withColumn('_2', explode('_2'))
            .rdd.map(lambda x: [str(x[0]), 
                                str(x[1][0]), 
                                float(x[1][1] if x[1][1] else 0)], 
                     preservesPartitioning = True)
            .toDF()
            )

For very wide dataframe I've got performance decreasing at _vars_and_vals generation from user6910411 answer.

It was useful to implement melting via selectExpr

columns=['a', 'b', 'c', 'd', 'e', 'f']
pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns)
df = spark.createDataFrame(pd_df)
+---+---+---+---+---+---+
|  a|  b|  c|  d|  e|  f|
+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|
|  4|  5|  6|  7|  9|  8|
|  7|  8|  9|  1|  2|  4|
|  8|  3|  9|  8|  7|  4|
+---+---+---+---+---+---+

cols = df.columns[1:]
df.selectExpr('a', "stack({}, {})".format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in cols))))
+---+----+----+
|  a|col0|col1|
+---+----+----+
|  1|   b|   2|
|  1|   c|   3|
|  1|   d|   4|
|  1|   e|   5|
|  1|   f|   6|
|  4|   b|   5|
|  4|   c|   6|
|  4|   d|   7|
|  4|   e|   9|
|  4|   f|   8|
|  7|   b|   8|
|  7|   c|   9|
...
Anton Alekseev
  • 542
  • 8
  • 18
  • I am having some type mismatch **cannot resolve.. due to data type mismatch: Argument 2 (DoubleType) != Argument 6 (LongType); line 1 pos 0;** . Testing shows that it seem stack imply the type of your col1 based on the first few elements of col0 . When let's say values for d or f of col0 come in, type mismatch. How would you solve that ? I am trying _stack({}, {})".format(len(cols), ', '.join(("'{}', cast({} as bigint)"..._ which seems to work, but not sure if it's the correct and efficient way. I am having performance issue when stacking hundreds of columns so efficiency is important. – Kenny Apr 23 '19 at 19:44
  • @Kenny I've never met such problem in this case. But you solution sounds logical. Also you can try my solution from update. – Anton Alekseev Apr 24 '19 at 08:37
-1

Use list comprehension to create struct column of column names and col values and explode the new column using the magic inline. Code below;

    melted_df=(df.withColumn(
                 #Create struct of column names and corresponding values
                'tab',F.array(*[F.struct(lit(x).alias('var'),F.col(x).alias('val'))for x in df.columns if x!='A'] ))
                 #Explode the column
                 .selectExpr('A',"inline(tab)")
          
)
  
melted_df.show()

+---+---+---+
|  A|var|val|
+---+---+---+
|  a|  B|  1|
|  a|  C|  2|
|  b|  B|  3|
|  b|  C|  4|
|  c|  B|  5|
|  c|  C|  6|
+---+---+---+
wwnde
  • 26,119
  • 6
  • 18
  • 32
-1

1) Copy & paste
2) Change the first 2 variables

to_melt = {'latin', 'greek', 'chinese'}
new_names = ['lang', 'letter']

melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt])
df = df.select(
    *(set(df.columns) - to_melt),
    F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})")
)

null is created if some values contain null. To remove it, add this:

.filter(f"!{new_names[1]} is null")

Full test:

from pyspark.sql import functions as F
df = spark.createDataFrame([(101, "A", "Σ", "西"), (102, "B", "Ω", "诶")], ['ID', 'latin', 'greek', 'chinese'])
df.show()
# +---+-----+-----+-------+
# | ID|latin|greek|chinese|
# +---+-----+-----+-------+
# |101|    A|    Σ|     西|
# |102|    B|    Ω|     诶|
# +---+-----+-----+-------+

to_melt = {'latin', 'greek', 'chinese'}
new_names = ['lang', 'letter']

melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt])
df = df.select(
    *(set(df.columns) - to_melt),
    F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})")
)

df.show()
# +---+-------+------+
# | ID|   lang|letter|
# +---+-------+------+
# |101|  latin|     A|
# |101|  greek|     Σ|
# |101|chinese|    西|
# |102|  latin|     B|
# |102|  greek|     Ω|
# |102|chinese|    诶|
# +---+-------+------+
ZygD
  • 22,092
  • 39
  • 79
  • 102