3

I have a following sample pyspark dataframe and after groupby I want to calculate mean, and first of multiple columns, In real case I have 100s of columns, so I cant do it individually

sp = spark.createDataFrame([['a',2,4,'cc','anc'], ['a',4,7,'cd','abc'], ['b',6,0,'as','asd'], ['b', 2, 4, 'ad','acb'],
                        ['c', 4, 4, 'sd','acc']], ['id', 'col1', 'col2','col3', 'col4'])

+---+----+----+----+----+
| id|col1|col2|col3|col4|
+---+----+----+----+----+
|  a|   2|   4|  cc| anc|
|  a|   4|   7|  cd| abc|
|  b|   6|   0|  as| asd|
|  b|   2|   4|  ad| acb|
|  c|   4|   4|  sd| acc|
+---+----+----+----+----+

This is what I am trying

mean_cols = ['col1', 'col2']
first_cols = ['col3', 'col4']
sc.groupby('id').agg(*[ f.mean for col in mean_cols], *[f.first for col in first_cols])

but it's not working. How can I do it like this with pyspark

Manu Sharma
  • 1,593
  • 4
  • 25
  • 48
  • 4
    you are not calling the column name inside the aggregation functions , you can try `f.mean(col)` instead of `f.mean` , it works `sp.groupBy('id').agg(*[f.mean(col) for col in mean_cols], *[f.first(col) for col in first_cols]).show()` – anky Jun 28 '20 at 09:39

2 Answers2

6

The best way for multiple functions on multiple columns is to use the .agg(*expr) format.

import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import numpy as np
#Test data
tst = sqlContext.createDataFrame([(1,2,3,4),(3,4,5,1),(5,6,7,8),(7,8,9,2)],schema=['col1','col2','col3','col4'])
fn_l = [F.min,F.max,F.mean,F.first]
col_l=['col1','col2','col3']
expr = [fn(coln).alias(str(fn.__name__)+'_'+str(coln)) for fn in fn_l for coln in col_l]
tst_r = tst.groupby('col4').agg(*expr)

The result will be

tst_r.show()
+----+--------+--------+--------+--------+--------+--------+---------+---------+---------+----------+----------+----------+
|col4|min_col1|min_col2|min_col3|max_col1|max_col2|max_col3|mean_col1|mean_col2|mean_col3|first_col1|first_col2|first_col3|
+----+--------+--------+--------+--------+--------+--------+---------+---------+---------+----------+----------+----------+
|   5|       5|       6|       7|       7|       8|       9|      6.0|      7.0|      8.0|         5|         6|         7|
|   4|       1|       2|       3|       3|       4|       5|      2.0|      3.0|      4.0|         1|         2|         3|
+----+--------+--------+--------+--------+--------+--------+---------+---------+---------+----------+----------+----------+

For selectively applying functions on columns, you can have multiple expression arrays and concatenate them in aggregation.

fn_l = [F.min,F.max]
fn_2=[F.mean,F.first]
col_l=['col1','col2']
col_2=['col1','col3','col4']
expr1 = [fn(coln).alias(str(fn.__name__)+'_'+str(coln)) for fn in fn_l for coln in col_l]
expr2 = [fn(coln).alias(str(fn.__name__)+'_'+str(coln)) for fn in fn_2 for coln in col_2]
tst_r = tst.groupby('col4').agg(*(expr1+expr2))
Raghu
  • 1,644
  • 7
  • 19
  • In your solution you are applying all functions on all the columns, but I want few functions on few columns specifically, how can I do that with expr – Manu Sharma Jun 29 '20 at 06:11
  • @ManuSharma - have updated the answer, check if it works. You can define multiple arrays and concatenate them in agg expression – Raghu Jun 29 '20 at 06:20
1

A simpler way to do:

import pyspark.sql.functions as F

tst_r = ( tst.groupby('col4')
             .agg(*[F.mean(col).alias(f"{col}_mean") for col in means_col],
                  *[F.first(col).alias(f"{col}_first") for col in firsts_col]) )