To achieve this , you need to create 2 individual dataframes containing the the aggregation and join it to back to the original dataframe.
Essentially I am attempting to append my original pyspark dataframe
with two news columns each containing the mean value for their paired
IDs.
Data Preparation
d = {
'id1':[1]*2 + [2] * 3,
'id2':[2]*2 + [1] * 3,
'value': [i for i in range(1,100,20)]
}
df = pd.DataFrame(d)
sparkDF = sql.createDataFrame(df)
sparkDF.show()
+---+---+-----+
|id1|id2|value|
+---+---+-----+
| 1| 2| 1|
| 1| 2| 21|
| 2| 1| 41|
| 2| 1| 61|
| 2| 1| 81|
+---+---+-----+
Aggregation & Join
sparkDF_agg_id1 = sparkDF.groupBy('id1').agg(F.mean(F.col('value')).alias('value_mean_id1'))
sparkDF_agg_id2 = sparkDF.groupBy('id2').agg(F.mean(F.col('value')).alias('value_mean_id2'))
finalDF = sparkDF.join(sparkDF_agg_id1
,sparkDF['id1'] == sparkDF_agg_id1['id1']
,'inner'
).select(sparkDF['*']
,sparkDF_agg_id1['value_mean_id1']
)
finalDF = finalDF.join(sparkDF_agg_id2
,finalDF['id2'] == sparkDF_agg_id2['id2']
,'inner'
).select(finalDF['*']
,sparkDF_agg_id2['value_mean_id2']
)
finalDF.show()
+---+---+-----+--------------+--------------+
|id1|id2|value|value_mean_id1|value_mean_id2|
+---+---+-----+--------------+--------------+
| 2| 1| 41| 61.0| 61.0|
| 2| 1| 61| 61.0| 61.0|
| 2| 1| 81| 61.0| 61.0|
| 1| 2| 1| 11.0| 11.0|
| 1| 2| 21| 11.0| 11.0|
+---+---+-----+--------------+--------------+