2

I have one DateFrame as A, like:

    +----+------+------+------+
    | key| val1 | val2 | date |
    +----+------+---------- --+
    |k1  |v4    |v7    |d1    |
    |k1  |v5    |v8    |d2    |
    |k1  |v6    |v9    |d3    |
    |k2  |v12   |v22   |d1    |
    |k2  |v32   |v42   |d2    |
    |k2  |v11   |v21   |d3    |
    +----+------+------+------+

I want to create one new DateFrame as B, like:

    +----+------+------+------+------+------+------+
    | key| d1v1 |d1v2  |d2v1  | d2v2 |d3v1  |d3v2  |
    +----+------+------+------+------+------+------+
    |k1  |v4    |v7    |v5    |v8    |v6    |v9    |
    |k2  |v12   |v22   |v32   |v42   |v11   |v21   |
    +----+------+------+------+------+------+------+

My solution is:

B = A.select('key').distinct()
tmp_d1v1 = A.select('key','val1').where(F.col('date') == 'd1')
tmp_d1v1 = tmp_d1v1.withColumnRenamed('val1' ,'d1v1')           
B = B.join(tmp_d1v1, 'key' , 'left_outer')
tmp_d1v2 = A.select('key','val2').where(F.col('date') == 'd1')
tmp_d1v2 = tmp_d1v2.withColumnRenamed('val2' ,'d1v2')           
B = B.join(tmp_d1v2, 'key' , 'left_outer')

tmp_d2v1 = A.select('key','val1').where(F.col('date') == 'd2')
tmp_d2v1 = tmp_d1v1.withColumnRenamed('val1' ,'d2v1')           
B = B.join(tmp_d2v1, 'key' , 'left_outer')
tmp_d2v2 = A.select('key','val2').where(F.col('date') == 'd2')
tmp_d2v2 = tmp_d2v2.withColumnRenamed('val2' ,'d2v2')           
B = B.join(tmp_d2v2, 'key' , 'left_outer')

tmp_d3v1 = A.select('key','val1').where(F.col('date') == 'd3')
tmp_d3v1 = tmp_d3v1.withColumnRenamed('val1' ,'d1v1')           
B = B.join(tmp_d3v1, 'key' , 'left_outer')
tmp_d3v2 = A.select('key','val2').where(F.col('date') == 'd3')
tmp_d3v2 = tmp_d3v2.withColumnRenamed('val2' ,'d1v2')           
B = B.join(tmp_d3v2, 'key' , 'left_outer')

I face the problem of OOM. I hope to find an alternative solution without join operation or reducing it.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Ivan Lee
  • 3,420
  • 4
  • 30
  • 45

1 Answers1

3

Thanks everyone to give some hints, I also referred the other answer .I am posting my answer and hope you appreciate it.

#first, I defined the Dataframe A:
data = [('k1', 'v4', 'v7', 'd1'),
    ('k1', 'v5', 'v8', 'd2'),
    ('k1', 'v6', 'v9', 'd3'),
    ('k2', 'v12', 'v22', 'd1'),
    ('k2', 'v32', 'v42', 'd2'),
    ('k2', 'v11', 'v21', 'd3')]
A = spark.createDataFrame(data, ['key', 'val1', 'val2','date'])
#second, using Pivot to define Dataframe B:
from pyspark.sql.functions 
B = A.groupBy('key').pivot('date') \
           .agg(first('val1').alias('v1'),first('val2').alias('v2'))
#result is: 
A.show()
+---+----+----+----+
|key|val1|val2|date|
+---+----+----+----+
| k1|  v4|  v7|  d1|
| k1|  v5|  v8|  d2|
| k1|  v6|  v9|  d3|
| k2| v12| v22|  d1|
| k2| v32| v42|  d2|
| k2| v11| v21|  d3|
+---+----+----+----+
B.show()
+---+-----+-----+-----+-----+-----+-----+
|key|d1_v1|d1_v2|d2_v1|d2_v2|d3_v1|d3_v2|
+---+-----+-----+-----+-----+-----+-----+
| k2|  v12|  v22|  v32|  v42|  v11|  v21|
| k1|   v4|   v7|   v5|   v8|   v6|   v9|
+---+-----+-----+-----+-----+-----+-----+
Community
  • 1
  • 1
Ivan Lee
  • 3,420
  • 4
  • 30
  • 45