48

I'm trying to transpose some columns of my table to row. I'm using Python and Spark 1.5.0. Here is my initial table:

+-----+-----+-----+-------+
|  A  |col_1|col_2|col_...|
+-----+-------------------+
|  1  |  0.0|  0.6|  ...  |
|  2  |  0.6|  0.7|  ...  |
|  3  |  0.5|  0.9|  ...  |
|  ...|  ...|  ...|  ...  |

I would like to have somthing like this:

+-----+--------+-----------+
|  A  | col_id | col_value |
+-----+--------+-----------+
|  1  |   col_1|        0.0|
|  1  |   col_2|        0.6|   
|  ...|     ...|        ...|    
|  2  |   col_1|        0.6|
|  2  |   col_2|        0.7| 
|  ...|     ...|        ...|  
|  3  |   col_1|        0.5|
|  3  |   col_2|        0.9|
|  ...|     ...|        ...|

Does someone know haw I can do it? Thank you for your help.

zero323
  • 322,348
  • 103
  • 959
  • 935
Raouf
  • 989
  • 2
  • 11
  • 15
  • See also [unpivot in spark-sql/pyspark](https://stackoverflow.com/q/42465568/9613318) and [How to melt Spark DataFrame?](https://stackoverflow.com/q/41670103/9613318) – Alper t. Turker May 11 '18 at 18:58

9 Answers9

74

Spark >= 3.4

You can use built-in melt method. With Python:

df.melt(
    ids=["A"], values=["col_1", "col_2"],
    variableColumnName="key", valueColumnName="val"
)

with Scala

df.melt(Array($"A"), Array($"col_1", $"col_2"), "key", "val")

Spark < 3.4

It is relatively simple to do with basic Spark SQL functions.

Python

from pyspark.sql.functions import array, col, explode, struct, lit

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

to_long(df, ["A"])
   

Scala:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")

def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
  val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
  require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")      

  val kvs = explode(array(
    cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
  ))
  
  val byExprs = by.map(col(_))

  df
    .select(byExprs :+ kvs.alias("_kvs"): _*)
    .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
}

toLong(df, Seq("A"))
10465355
  • 4,481
  • 2
  • 20
  • 44
zero323
  • 322,348
  • 103
  • 959
  • 935
13

One way to solve with pyspark sql using functions create_map and explode.

from pyspark.sql import functions as func
#Use `create_map` to create the map of columns with constant 
df = df.withColumn('mapCol', \
                    func.create_map(func.lit('col_1'),df.col_1,
                                    func.lit('col_2'),df.col_2,
                                    func.lit('col_3'),df.col_3
                                   ) 
                  )
#Use explode function to explode the map 
res = df.select('*',func.explode(df.mapCol).alias('col_id','col_value'))
res.show()
Vamsi Prabhala
  • 48,685
  • 4
  • 36
  • 58
  • Nice answer, Could you please explain a bit? – Code run Mar 19 '21 at 08:26
  • Can you let me know how to convert this to all the columns without hardcoding column names .. I tried this but getting error ``` df = df.withColumn( 'mapCol', F.create_map( *[F.struct(F.lit(x), F.col(x)) for x in df.columns] ) ) Error: pyspark.sql.utils.AnalysisException: cannot resolve 'map(struct('col_name', 'col_name'))' due to data type mismatch: map expects a positive even number of arguments.; ``` – Sunil Mar 11 '22 at 13:51
7

The Spark local linear algebra libraries are presently very weak: and they do not include basic operations as the above.

There is a JIRA for fixing this for Spark 2.1 - but that will not help you today.

Something to consider: performing a transpose will likely require completely shuffling the data.

For now you will need to write RDD code directly. I have written transpose in scala - but not in python. Here is the scala version:

 def transpose(mat: DMatrix) = {
    val nCols = mat(0).length
    val matT = mat
      .flatten
      .zipWithIndex
      .groupBy {
      _._2 % nCols
    }
      .toSeq.sortBy {
      _._1
    }
      .map(_._2)
      .map(_.map(_._1))
      .toArray
    matT
  }

So you can convert that to python for your use. I do not have bandwidth to write/test that at this particular moment: let me know if you were unable to do that conversion.

At the least - the following are readily converted to python.

  • zipWithIndex --> enumerate() (python equivalent - credit to @zero323)
  • map --> [someOperation(x) for x in ..]
  • groupBy --> itertools.groupBy()

Here is the implementation for flatten which does not have a python equivalent:

  def flatten(L):
        for item in L:
            try:
                for i in flatten(item):
                    yield i
            except TypeError:
                yield item

So you should be able to put those together for a solution.

WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560
  • Thank you for your answer. I don't know scala but I will try to understand your code. I will keep you informed. – Raouf Jun 16 '16 at 17:23
  • @Raouf The code above all has equivalents in python. If you know python well there should not be an issue. I showed the `flatten` which is the only one missing from python. Let me know ;) – WestCoastProjects Jun 16 '16 at 17:30
  • 1
    `zipWithIndex` --> `enumerate()` (Python equivalent)? – zero323 Jun 16 '16 at 18:44
  • @zero323 Good eyes! I am going to upvote your v nice answer btw. – WestCoastProjects Jun 16 '16 at 18:51
  • Thanks. It is slightly more verbose but doesn't move data so much. – zero323 Jun 16 '16 at 18:56
  • "does not move data as much". V likely true. My code is a series of shuffle-inducing transformations. Will look more closely at your approach from the "retain the present partitioning scheme" perspective. Update: yes you have fewer xform steps - so faster as well as likely easier to understand. – WestCoastProjects Jun 16 '16 at 19:57
7

You could use the stack function:

for example:

df.selectExpr("stack(2, 'col_1', col_1, 'col_2', col_2) as (key, value)")

where:

  • 2 is the number of columns to stack (col_1 and col_2)
  • 'col_1' is a string for the key
  • col_1 is the column from which to take the values

if you have several columns, you could build the whole stack string iterating the column names and pass that to selectExpr

Gonza Piotti
  • 707
  • 10
  • 10
  • 2
    df.selectExpr('column_names_to_keep', 'column_names_to_keep', "stack(2, 'col_1', col_1, 'col_2', col_2) as (key, value)") – Decula Jan 11 '21 at 01:51
  • 1
    I am using this function, but running into columns with different data types. I.e. some are string, and some decimal. How do I convert decimal type to string using stack? – Whitey Winn Aug 21 '21 at 17:00
2

Use flatmap. Something like below should work

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID': k, 'colValue': row[k]})

newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))
David
  • 11,245
  • 3
  • 41
  • 46
  • Thank you for your answer. But it does not work. Here is the error message I get: **TypeError: tuple indices must be integers, not str** – Raouf Jun 16 '16 at 17:19
1

I took the Scala answer that @javadba wrote and created a Python version for transposing all columns in a DataFrame. This might be a bit different from what OP was asking...

from itertools import chain
from pyspark.sql import DataFrame


def _sort_transpose_tuple(tup):
    x, y = tup
    return x, tuple(zip(*sorted(y, key=lambda v_k: v_k[1], reverse=False)))[0]


def transpose(X):
    """Transpose a PySpark DataFrame.

    Parameters
    ----------
    X : PySpark ``DataFrame``
        The ``DataFrame`` that should be tranposed.
    """
    # validate
    if not isinstance(X, DataFrame):
        raise TypeError('X should be a DataFrame, not a %s' 
                        % type(X))

    cols = X.columns
    n_features = len(cols)

    # Sorry for this unreadability...
    return X.rdd.flatMap( # make into an RDD
        lambda xs: chain(xs)).zipWithIndex().groupBy( # zip index
        lambda val_idx: val_idx[1] % n_features).sortBy( # group by index % n_features as key
        lambda grp_res: grp_res[0]).map( # sort by index % n_features key
        lambda grp_res: _sort_transpose_tuple(grp_res)).map( # maintain order
        lambda key_col: key_col[1]).toDF() # return to DF

For example:

>>> X = sc.parallelize([(1,2,3), (4,5,6), (7,8,9)]).toDF()
>>> X.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+

>>> transpose(X).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  4|  7|
|  2|  5|  8|
|  3|  6|  9|
+---+---+---+
TayTay
  • 6,882
  • 4
  • 44
  • 65
1

A very handy way to implement:

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID' : k, 'colValue' : row[k]})

    newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)
Martin Evans
  • 45,791
  • 17
  • 81
  • 97
0

To transpose Dataframe in pySpark, I use pivot over the temporary created column, which I drop at the end of the operation.

Say, we have a table like this. What we wanna do is to find all users over each listed_days_bin value.

+------------------+-------------+
|  listed_days_bin | users_count | 
+------------------+-------------+
|1                 |            5| 
|0                 |            2|
|0                 |            1| 
|1                 |            3|  
|1                 |            4| 
|2                 |            5| 
|2                 |            7|  
|2                 |            2|  
|1                 |            1|
+------------------+-------------+

Create new temp column - 'pvt_value', aggregate over it and pivot results

import pyspark.sql.functions as F


agg_df = df.withColumn('pvt_value', lit(1))\
        .groupby('pvt_value')\
        .pivot('listed_days_bin')\
        .agg(F.sum('users_count')).drop('pvt_value')

New Dataframe should look like:

+----+---+---+
|  0 | 1 | 2 | # Columns 
+----+---+---+
|   3| 13| 14| # Users over the bin
+----+---+---+
Artem Zaika
  • 1,130
  • 13
  • 13
0

I found PySpark to be too complicated to transpose so I just convert my dataframe to Pandas and use the transpose() method and convert the dataframe back to PySpark if required.

dfOutput = spark.createDataFrame(dfPySpark.toPandas().transpose())
dfOutput.display()
Syed
  • 144
  • 9