I have one DateFrame as A, like:
+----+------+------+------+
| key| val1 | val2 | date |
+----+------+---------- --+
|k1 |v4 |v7 |d1 |
|k1 |v5 |v8 |d2 |
|k1 |v6 |v9 |d3 |
|k2 |v12 |v22 |d1 |
|k2 |v32 |v42 |d2 |
|k2 |v11 |v21 |d3 |
+----+------+------+------+
I want to create one new DateFrame as B, like:
+----+------+------+------+------+------+------+
| key| d1v1 |d1v2 |d2v1 | d2v2 |d3v1 |d3v2 |
+----+------+------+------+------+------+------+
|k1 |v4 |v7 |v5 |v8 |v6 |v9 |
|k2 |v12 |v22 |v32 |v42 |v11 |v21 |
+----+------+------+------+------+------+------+
My solution is:
B = A.select('key').distinct()
tmp_d1v1 = A.select('key','val1').where(F.col('date') == 'd1')
tmp_d1v1 = tmp_d1v1.withColumnRenamed('val1' ,'d1v1')
B = B.join(tmp_d1v1, 'key' , 'left_outer')
tmp_d1v2 = A.select('key','val2').where(F.col('date') == 'd1')
tmp_d1v2 = tmp_d1v2.withColumnRenamed('val2' ,'d1v2')
B = B.join(tmp_d1v2, 'key' , 'left_outer')
tmp_d2v1 = A.select('key','val1').where(F.col('date') == 'd2')
tmp_d2v1 = tmp_d1v1.withColumnRenamed('val1' ,'d2v1')
B = B.join(tmp_d2v1, 'key' , 'left_outer')
tmp_d2v2 = A.select('key','val2').where(F.col('date') == 'd2')
tmp_d2v2 = tmp_d2v2.withColumnRenamed('val2' ,'d2v2')
B = B.join(tmp_d2v2, 'key' , 'left_outer')
tmp_d3v1 = A.select('key','val1').where(F.col('date') == 'd3')
tmp_d3v1 = tmp_d3v1.withColumnRenamed('val1' ,'d1v1')
B = B.join(tmp_d3v1, 'key' , 'left_outer')
tmp_d3v2 = A.select('key','val2').where(F.col('date') == 'd3')
tmp_d3v2 = tmp_d3v2.withColumnRenamed('val2' ,'d1v2')
B = B.join(tmp_d3v2, 'key' , 'left_outer')
I face the problem of OOM. I hope to find an alternative solution without join
operation or reducing it.