I am using pyspark 2.4.2, so the per the docs for this version one can do this to create a GROUPED_MAP:
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v"))
@pandas_udf(returnType="id long, v double", functionType=PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
This works but you cannot call subtract_mean
as a normal python function that is passed a pandas DataFrame. But if you do this, it will work:
def subtract_mean(pdf):
v = pdf.v
return pdf.assign(v=v - v.mean())
sub_spark = pandas_udf(f=subtract_mean, returnType="id long, v double", functionType=PandasUDFType.GROUPED_MAP)
df.groupby("id").apply(sub_spark).show()
Now you can call subtract_mean
from python passing a pandas DataFrame. How does one do this using the annotation approach? It is not clear from the docs how to do this. What function is annotated and what function is given for the f
parameter?