0

I'm attempting to do something very similar to this post here, but I need to use pyspark dataframes and I'm looking to create two columns based off different IDs.

Essentially I am attempting to append my original pyspark dataframe with two news columns each containing the mean value for their paired IDs.

An example initial df and the output df can be found below:

Example input and output

  • Possible duplicate https://stackoverflow.com/questions/32670958/spark-dataframe-computing-row-wise-mean-or-any-aggregate-operation – Dipanjan Mallick Mar 19 '22 at 01:33
  • Question needs some code: Please provide enough code so others can better understand or reproduce the problem: https://stackoverflow.com/help/minimal-reproducible-example – D.L Mar 19 '22 at 02:05

1 Answers1

0

To achieve this , you need to create 2 individual dataframes containing the the aggregation and join it to back to the original dataframe.

Essentially I am attempting to append my original pyspark dataframe with two news columns each containing the mean value for their paired IDs.

Data Preparation


d = {
        'id1':[1]*2 + [2] * 3,
        'id2':[2]*2 + [1] * 3,
        'value': [i for i in range(1,100,20)]
    }

df = pd.DataFrame(d)

sparkDF = sql.createDataFrame(df)

sparkDF.show()

+---+---+-----+
|id1|id2|value|
+---+---+-----+
|  1|  2|    1|
|  1|  2|   21|
|  2|  1|   41|
|  2|  1|   61|
|  2|  1|   81|
+---+---+-----+

Aggregation & Join


sparkDF_agg_id1 = sparkDF.groupBy('id1').agg(F.mean(F.col('value')).alias('value_mean_id1'))
sparkDF_agg_id2 = sparkDF.groupBy('id2').agg(F.mean(F.col('value')).alias('value_mean_id2'))

finalDF = sparkDF.join(sparkDF_agg_id1
                       ,sparkDF['id1'] == sparkDF_agg_id1['id1']
                       ,'inner'
                  ).select(sparkDF['*']
                        ,sparkDF_agg_id1['value_mean_id1']
                  )

finalDF = finalDF.join(sparkDF_agg_id2
                       ,finalDF['id2'] == sparkDF_agg_id2['id2']
                       ,'inner'
                  ).select(finalDF['*']
                        ,sparkDF_agg_id2['value_mean_id2']

                )
                           
finalDF.show()

+---+---+-----+--------------+--------------+
|id1|id2|value|value_mean_id1|value_mean_id2|
+---+---+-----+--------------+--------------+
|  2|  1|   41|          61.0|          61.0|
|  2|  1|   61|          61.0|          61.0|
|  2|  1|   81|          61.0|          61.0|
|  1|  2|    1|          11.0|          11.0|
|  1|  2|   21|          11.0|          11.0|
+---+---+-----+--------------+--------------+
Vaebhav
  • 4,672
  • 1
  • 13
  • 33