5

I am trying to calculate weighted mean in pyspark but not making a lot of progress

# Example data
df = sc.parallelize([
    ("a", 7, 1), ("a", 5, 2), ("a", 4, 3),
    ("b", 2, 2), ("b", 5, 4), ("c", 1, -1)
]).toDF(["k", "v1", "v2"])
df.show()

import numpy as np
def weighted_mean(workclass, final_weight):
    return np.average(workclass, weights=final_weight)

weighted_mean_udaf = pyspark.sql.functions.udf(weighted_mean,
    pyspark.sql.types.IntegerType())

but when I try to execute this code

df.groupby('k').agg(weighted_mean_udaf(df.v1,df.v2)).show()

I am getting the error

u"expression 'pythonUDF' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get

My question is, can I specify a custom function ( taking multiple arguments) as argument to agg? If not, is there any alternative to perform operations like weighted mean after grouping by a key?

MARK
  • 2,302
  • 4
  • 25
  • 44
  • 1
    Did you mean to override the `weighted_mean` function? – OneCricketeer Aug 08 '16 at 18:04
  • What I want to do is a) groupby b) perform an operation depending on multiple columns of the dataframe . Weighted mean is just an example. – MARK Aug 08 '16 at 18:08
  • I think what @cricket_007 meant is do you intentionally override `weighted_mean` by this line `weighted_mean = pyspark.sql.functions.udf(weighted_mean,` or it is a typo? – akarilimano Aug 08 '16 at 18:29
  • 1
    I don't think that the [`agg`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg) function takes that parameter type that you are giving – OneCricketeer Aug 08 '16 at 18:35
  • 1
    @cricket_007 is right here. Agg accepts only proper UDAFs. Take a look for example at http://stackoverflow.com/a/32101530/1560062. There is no Python API though. For trivial cases like this one all you need is just a simple formula so it looks like a quite artificial problem. – zero323 Aug 08 '16 at 19:10

1 Answers1

4

A User Defined Aggregation Function (UDAF, which works on pyspark.sql.GroupedData but not supported in pyspark) is not the same as a User Defined Function (UDF, which works on pyspark.sql.DataFrame).

Because in pyspark you cannot create your own UDAF, and the supplied UDAFs cannot resolve your issue, you may need to go back to RDD world:

from numpy import sum

def weighted_mean(vals):
    vals = list(vals)  # save the values from the iterator
    sum_of_weights = sum(tup[1] for tup in vals)
    return sum(1. * tup[0] * tup[1] / sum_of_weights for tup in vals)

df.rdd.map(
    lambda x: (x[0], tuple(x[1:]))  # reshape to (key, val) so grouping could work
).groupByKey().mapValues(
    weighted_mean
).collect()
ijoseph
  • 6,505
  • 4
  • 26
  • 26
shuaiyuancn
  • 2,744
  • 3
  • 24
  • 32
  • Thanks @ijoseph for pointing out `map` works on `df.rdd`. At the time of writing I was used to just call `df.map` as a shorthand. Not sure if it still works, but good to be explicit. – shuaiyuancn Jan 27 '20 at 10:46
  • It threw an error for me (in Spark 2.4.3) without the `.rdd`, I believe. – ijoseph Jan 27 '20 at 21:40