4

I have the following data in a pyspark dataframe called end_stats_df:

values     start    end    cat1   cat2
10          1        2      A      B
11          1        2      C      B
12          1        2      D      B
510         1        2      D      C
550         1        2      C      B
500         1        2      A      B
80          1        3      A      B

And I want to aggregate it in the following way:

  • I want to use the "start" and "end" columns as the aggregate keys
  • For each group of rows, I need to do the following:
    • Compute the unique number of values in both cat1 and cat2 for that group. e.g., for the group of start=1 and end=2, this number would be 4 because there's A, B, C, D. This number will be stored as n (n=4 in this example).
    • For the values field, for each group I need to sort the values, and then select every n-1 value, where n is the value stored from the first operation above.
    • At the end of the aggregation, I don't really care what is in cat1 and cat2 after the operations above.

An example output from the example above is:

values     start    end    cat1   cat2
12          1        2      D      B
550         1        2      C      B
80          1        3      A      B

How do I accomplish using pyspark dataframes? I assume I need to use a custom UDAF, right?

mece1390
  • 161
  • 2
  • 13
makansij
  • 9,303
  • 37
  • 105
  • 183

1 Answers1

9

Pyspark do not support UDAF directly, so we have to do aggregation manually.

from pyspark.sql import functions as f

def func(values, cat1, cat2):
    n = len(set(cat1 + cat2))
    return sorted(values)[n - 2]


df = spark.read.load('file:///home/zht/PycharmProjects/test/text_file.txt', format='csv', sep='\t', header=True)
df = df.groupBy(df['start'], df['end']).agg(f.collect_list(df['values']).alias('values'),
                                            f.collect_set(df['cat1']).alias('cat1'),
                                            f.collect_set(df['cat2']).alias('cat2'))
df = df.select(df['start'], df['end'], f.UserDefinedFunction(func, StringType())(df['values'], df['cat1'], df['cat2']))
E. Ducateme
  • 4,028
  • 2
  • 20
  • 30
Zhang Tong
  • 4,569
  • 3
  • 19
  • 38
  • That is great, I will try that out and report back to you, thanks. – makansij Sep 13 '17 at 02:42
  • What is `f` in your example? – makansij Sep 13 '17 at 16:41
  • Never mind, I see that it is "functions" from pyspark import. – makansij Sep 13 '17 at 18:21
  • 2
    PySpark now supports UDAFs with help from pandas: see [this answer](https://stackoverflow.com/a/47497815/376390). – Marco Jan 24 '19 at 22:20
  • 1
    This method uses `collect` which brings the data to the driver. This defeats the purpose of spark. In which case, the pandas enabled UDAF mentioned y @Marco might be slightly better. But these solutions would not work for me, since my UDF needs to be applied to pyspark dataframes and not pandas. Is there any other way to apply aggregated UDF on pyspark dataframes? – thentangler Mar 05 '21 at 19:53
  • 1
    @thentangler: UDAFs are aggregate UDFs for.. *pyspark*. https://www.google.com/search?q=pyspark+UDAF+2.4.4 – Marco Mar 07 '21 at 10:03