1

I have a requirement to write a pySpark custom UDAF, I came across this example Applying UDFs on GroupedData in PySpark (with functioning python example) . On the similar lines, as shown in the last part of the thread, I came up with the below function

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_value1", DoubleType()),
    StructField("avg_value2", DoubleType()),
    StructField("sum_avg", DoubleType()),
    StructField("sub_avg", DoubleType()),
    StructField("bf_signature", Binary())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    bloomfilter = BloomFilter(8, 1)
    bloomfilter.set(df.value1)
    p=bloomfilter
    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]+[p]])

df3.groupby("key").apply(g).show()

As shown in the code, I want to create a custom BloomFilter which will build the bloomfilter for the entire column, similar to mean() function processing aggregating the entire column and producing One aggregated result for each group.

How can I write this custom UDAF in python?

Raj
  • 401
  • 6
  • 20
  • I didn't find any solution for this, so switched to Java UserdefinedAggregateFunction – Raj Apr 09 '19 at 02:19
  • pyspark now supports UDAF with pandas, check this out https://stackoverflow.com/questions/40006395/applying-udfs-on-groupeddata-in-pyspark-with-functioning-python-example/47497815#47497815 – Kushagra Verma Aug 19 '19 at 09:21

1 Answers1

1

Maybe, can be useful this blog, is not a really UDAF in python, but it's a hack to write similar functionallity.

The hack is:

  1. Apply groupBy to DF
  2. Apply collect_list() in the agg() function
  3. Apply a normal python UDF function to result list of collect_list()