I have a requirement to write a pySpark custom UDAF, I came across this example Applying UDFs on GroupedData in PySpark (with functioning python example) . On the similar lines, as shown in the last part of the thread, I came up with the below function
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_value1", DoubleType()),
StructField("avg_value2", DoubleType()),
StructField("sum_avg", DoubleType()),
StructField("sub_avg", DoubleType()),
StructField("bf_signature", Binary())
])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
gr = df['key'].iloc[0]
x = df.value1.mean()
y = df.value2.mean()
w = df.value1.mean() + df.value2.mean()
z = df.value1.mean() - df.value2.mean()
bloomfilter = BloomFilter(8, 1)
bloomfilter.set(df.value1)
p=bloomfilter
return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]+[p]])
df3.groupby("key").apply(g).show()
As shown in the code, I want to create a custom BloomFilter which will build the bloomfilter for the entire column, similar to mean() function processing aggregating the entire column and producing One aggregated result for each group.
How can I write this custom UDAF in python?