1

From PySpark, I am trying to define a custom aggregator that is accumulating state . Is it possible in Spark 2.3 ?

AFAIK, it is now possible to define a custom UDAF in PySpark since Spark 2.3 (cf How to define and use a User-Defined Aggregate Function in Spark SQL?), by calling pandas_udf with the PandasUDFType.GROUPED_AGG keyword. However given that it is just taking a function as a parameter I don't think it is possible to carry state around during the aggregation.

From Scala, I see it is possible to have stateful aggregation by either extending UserDefinedAggregateFunction or org.apache.spark.sql.expressions.Aggregator , but is there a similar thing I can do on python-side only?

lezebulon
  • 7,607
  • 11
  • 42
  • 73

1 Answers1

0

You could use an accumulator.

You could leverage spark streaming built-in state management.

simple accumulator example for use in SQL

from  pyspark.sql.types import IntegerType

# have some data
df = spark.range(10).toDF("num")

# have a table
df.createOrReplaceTempView("num_table")

# have an accumulator
accSum = sc.accumulator(0)

# have a function that accumulates
def add_acc(int_val):
  accSum.add(int_val)
  return int_val

# register function as udf
spark.udf.register("reg_addacc", add_acc, IntegerType())

# use in sql
spark.sql("SELECT sum(reg_addacc(num)) FROM num_table").show()

# get value from accumulator
print(accSum.value)

45

Michael West
  • 1,636
  • 16
  • 23
  • but it seems they are not available through spark-sql right? accumulators seem to be for regular spark tasks "state management" part seems to refer to spark streaming – lezebulon Oct 02 '18 at 17:08
  • I am sure I have seen a udf that manages state in an accumulator that can be called from sql. However, I can't find it right now. I'll have to try it out. – Michael West Oct 02 '18 at 19:23
  • updated answer with accumulator / udf / sql example. – Michael West Oct 03 '18 at 13:16