0

I have a dataframe looks like:

group, rate
A,0.1
A,0.2
B,0.3
B,0.1
C,0.1
C,0.2

How can I transpose this to a wide data frame. This is what I expect to get:

group, rate_1, rate_2
A,0.1,0.2
B,0.3,0.1
C,0.1,0.2

The number of records in each group is the same and also how to create a consistent column name with prefix or suffix while transposing?

Do you know which function I can use?

Thanks,

Yi Du
  • 455
  • 2
  • 8
  • 17

2 Answers2

0

Try with groupBy, collect_list then dynamically split the array column as new columns.

Example:

df.show()
#+-----+----+
#|group|rate|
#+-----+----+
#|    A| 0.1|
#|    A| 0.2|
#|    B| 0.3|
#|    B| 0.1|
#+-----+----+

arr_size = 2
exprs=['group']+[expr('lst[' + str(x) + ']').alias('rate_'+str(x+1)) for x in range(0, arr_size)]

df1=df.groupBy("group").agg(collect_list(col("rate")).alias("lst"))
df1.select(*exprs).show()
#+-----+------+------+
#|group|rate_1|rate_2|
#+-----+------+------+
#|    B|   0.3|   0.1|
#|    A|   0.1|   0.2|
#+-----+------+------+

For Preserver Order in collect_list():

df=spark.createDataFrame([('A',0.1),('A',0.2),('B',0.3),('B',0.1)],['group','rate']).withColumn("mid",monotonically_increasing_id()).repartition(100)

from pyspark.sql.functions import *
from pyspark.sql import *

w=Window.partitionBy("group").orderBy("mid")
w1=Window.partitionBy("group").orderBy(desc("mid"))

df1=df.withColumn("lst",collect_list(col("rate")).over(w)).\
withColumn("snr",row_number().over(w1)).\
filter(col("snr") == 1).\
drop(*['mid','snr','rate'])

df1.show()
#+-----+----------+
#|group|       lst|
#+-----+----------+
#|    B|[0.3, 0.1]|
#|    A|[0.1, 0.2]|
#+-----+----------+

arr_size = 2
exprs=['group']+[expr('lst[' + str(x) + ']').alias('rate_'+str(x+1)) for x in range(0, arr_size)]

df1.select(*exprs).show()
+-----+------+------+
|group|rate_1|rate_2|
+-----+------+------+
|    B|   0.3|   0.1|
|    A|   0.1|   0.2|
+-----+------+------+
notNull
  • 30,258
  • 4
  • 35
  • 50
0

I would create a column to rank your "rate" column and then pivot:

First create a "rank" column and concatenate the string "rate_" to the row_number:

from pyspark.sql.functions import concat, first, lit, row_number
from pyspark.sql import Window

df = df.withColumn(
    "rank", 
    concat(
        lit("rate_"),
        row_number().over(Window.partitionBy("group")\
            .orderBy("rate")).cast("string")
    )
)
df.show()
#+-----+----+------+
#|group|rate|  rank|
#+-----+----+------+
#|    B| 0.1|rate_1|
#|    B| 0.3|rate_2|
#|    C| 0.1|rate_1|
#|    C| 0.2|rate_2|
#|    A| 0.1|rate_1|
#|    A| 0.2|rate_2|
#+-----+----+------+

Now group by the "group" column and pivot on the "rank" column. Since you need an aggregation, use first.

df.groupBy("group").pivot("rank").agg(first("rate")).show()
#+-----+------+------+
#|group|rate_1|rate_2|
#+-----+------+------+
#|    B|   0.1|   0.3|
#|    C|   0.1|   0.2|
#|    A|   0.1|   0.2|
#+-----+------+------+

The above does not depend on knowing the number of records in each group ahead of time.

However if (like you said) you know the number of records in each group you can make the pivot more efficient by passing in the values

num_records = 2
values = ["rate_" + str(i+1) for i in range(num_records)]
df.groupBy("group").pivot("rank", values=values).agg(first("rate")).show()
#+-----+------+------+
#|group|rate_1|rate_2|
#+-----+------+------+
#|    B|   0.1|   0.3|
#|    C|   0.1|   0.2|
#|    A|   0.1|   0.2|
#+-----+------+------+
pault
  • 41,343
  • 15
  • 107
  • 149
  • I will add that this method will be deterministic in that the rate columns will be ordered, whereas using `collect_list` does not guarantee the same order every time. – pault Aug 19 '20 at 16:52