I am new to Spark and want to pivot a PySpark dataframe on multiple columns. There is a single row for each distinct (date, rank) combination. The rows should be flattened such that there is one row per unique date.
import pyspark.sql.functions as F
from datetime import datetime
data= [(datetime(2021,8,4,13,0), 1, 22, "a"),(datetime(2021,8,4,13,0), 2, 14, "a"),(datetime(2021,8,4,13,0), 3, 9, "a"),(datetime(2021,8,4,13,0), 4, 7, "a"),
(datetime(2021,8,4,14,0), 1, 16, "b"),(datetime(2021,8,4,14,0), 2, 21, "b"),(datetime(2021,8,4,14,0), 3, 17, "b"),(datetime(2021,8,4,14,0), 4, 18, "b"),
(datetime(2021,8,4,15,0), 1, 19, "a"),(datetime(2021,8,4,15,0), 2, 9, "b"),(datetime(2021,8,4,15,0), 3, 10, "c"),(datetime(2021,8,4,15,0), 4, 13, "d")
]
columns= ["date","rank","feat1","feat2"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)
+-------------------+----+-----+-----+
|date |rank|feat1|feat2|
+-------------------+----+-----+-----+
|2021-08-04 13:00:00|1 |22 |a |
|2021-08-04 13:00:00|2 |14 |a |
|2021-08-04 13:00:00|3 |9 |a |
|2021-08-04 13:00:00|4 |7 |a |
|2021-08-04 14:00:00|1 |16 |b |
|2021-08-04 14:00:00|2 |21 |b |
|2021-08-04 14:00:00|3 |17 |b |
|2021-08-04 14:00:00|4 |18 |b |
|2021-08-04 15:00:00|1 |19 |a |
|2021-08-04 15:00:00|2 |9 |b |
|2021-08-04 15:00:00|3 |10 |c |
|2021-08-04 15:00:00|4 |13 |d |
+-------------------+----+-----+-----+
The real data has 30+ feature columns, and the ranks go from 1 to 100 for each date. The desired output:
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
| date|rank1_feat1|rank2_feat1|rank3_feat1|rank4_feat1|rank1_feat2|rank2_feat2|rank3_feat2|rank4_feat2|
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|2021-08-04 15:00:00| 19| 9| 10| 13| a| b| c| d|
|2021-08-04 13:00:00| 22| 14| 9| 7| a| a| a| a|
|2021-08-04 14:00:00| 16| 21| 17| 18| b| b| b| b|
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
I have a solution that appears to work for my trivial example, but the memory usage is so extreme that I can't use even 1/500th of my data without getting memory errors.
dfspine = df.select("date").distinct()
for col in df.columns:
if col not in ["date", "rank"]:
piv = df.groupby("date").pivot("rank").agg(F.first(col))
mapping = dict([(pivcol,"rank%s_%s" % (pivcol, col)) for pivcol in piv.columns if pivcol not in ["date"]])
piv = piv.select([F.col(c).alias(mapping.get(c, c)) for c in piv.columns])
dfspine = dfspine.join(piv, how="left", on="date")