6

I always thought that Spark does not allow to define User-Defined-Window-Functions. I just tested the "Geometric Mean" UDAF example from here (https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html) as a window function, and it seems to work just fine, e.g.:

val geomMean = new GeometricMean

(1 to 10).map(i=>
  (i,i.toDouble)
)
.toDF("i","x")
.withColumn("geom_mean",geomMean($"x").over(Window.orderBy($"i").rowsBetween(-1,1)))
.show()

+---+----+------------------+
|  i|   x|         geom_mean|
+---+----+------------------+
|  1| 1.0|1.4142135623730951|
|  2| 2.0|1.8171205928321397|
|  3| 3.0|2.8844991406148166|
|  4| 4.0|3.9148676411688634|
|  5| 5.0|  4.93242414866094|
|  6| 6.0| 5.943921952763129|
|  7| 7.0| 6.952053289772898|
|  8| 8.0| 7.958114415792783|
|  9| 9.0| 8.962809493114328|
| 10|10.0| 9.486832980505138|
+---+----+------------------+

I've never seen the spark docs talk about using UDAF as window function. Is this allowed, i.e. are the results correct? I'm using spark 2.1 by the way

EDIT:

What confuses me is that in standard aggregation (i.e. followed by a groupBy), data is always added to the buffers, i.e. they will always grow, never shrink. With window function (especially in conjunction with rowsBetween()), data also need to be removed from the buffer, as "old" element will drop out of the window as it moves along the rows defined by the ordering. I think of window-functions to move along the ordering with a state. So I assumed there must be something like a "remove" method to be implemented

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • Can you please help me with revised code? because i am using spark2.3 and i have copied your code but it is now not allowing me to pass col("x") like this geomMean($"x"), it says :`org.apache.commons.math3.stat.descriptive.moment.GeometricMean does not take parametersbloop` – Lingaraj Feb 20 '22 at 14:35
  • 1
    @Lingaraj You seem to have to wrong import. GeometricMean is not from apache commons math, but you have to define it yourself (code given here : https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html) – Raphael Roth Feb 23 '22 at 10:31

1 Answers1

4

I am not sure what exactly is your question.

Can every Spark UDAF be used with Window?

Yes

Here is my personal experience in this topic:

I have been working lately a lot with Spark window functions and UDAFs (Spark 2.0.1) and I confirm they work very well together. Results are correct (assuming your UDAF is correctly written). UDAFs are a bit of a pain to write, but once you get it, it goes fast for next ones.

I didn't test all of them, but build-in aggregation functions from org.apache.spark.sql.functions._ worked also for me. Search for Aggregate in functions. I was working mostly with some classical aggregators like sum, count, avg, stddev and they all returned correct values.

astro_asz
  • 2,278
  • 3
  • 15
  • 31
  • The thing that confuses me is that I always assumed that window-functions also need to implement a function the remove an element from the buffer (think of moving/rolling average using `rowsBetween(-1,1)`). But apparently this is not the case – Raphael Roth Feb 15 '18 at 10:16
  • This kind of things are taken care of in the implementation of the UDAF where you have to define your buffer, how to update it, how to merge and other stuff.... And then the engine behind the `class UserDefinedAggregateFunction` knows how to deal with the buffer for given range or rows. – astro_asz Feb 15 '18 at 10:22
  • Yes I know which method to implement, but as I said, I was expecting window functions to have a method which define how to remove an element. Think of `collect_list(..)` in conjuntion with `rowsBetween(-1,1)`. As the window moves forward, I expected the the oldest element is removed and the new element is added. But I'm now pretty sure that this is not the case, collect_list seems to build up the buffer from scratch for every row in the dataframe, although this seems quite inefficient – Raphael Roth Feb 15 '18 at 10:27
  • Sorry, maybe we are confusing terms. What do you mean by window function? For me in Spark there is a `Window`+aggregator that is applied to the window and this is what I call window function. I never encountered an official term "User defined window function". You asked *Can every Spark UDAF be used with Window?* and the answer is yes :) – astro_asz Feb 15 '18 at 10:31
  • I think we mean the same thing, sorry that I'm unable to explain what I mean. – Raphael Roth Feb 15 '18 at 10:42
  • Is this allowed? Question has been changed after the answer was provided. This is a different question now. – astro_asz Feb 15 '18 at 10:43
  • No I did not change the question, the question is still the same, just removed a secondary question as I want to focus on UDAFs – Raphael Roth Feb 15 '18 at 10:46
  • @astro_asz If you try to use the built-in `countDistinct` with a rolling window it fails. Are you sure that the answer is "yes" and always "yes"? – Marsellus Wallace Apr 24 '18 at 13:55
  • Hi @Gevorg UDAF are user defined aggregate functions, so if you will write your own UDAF correctly it should work. spark.sql.functions.countDistinct is not UDAF. As I said in my answer I know that some aggregates in spark.sql.functions will work but I didn't test them all. – astro_asz Apr 24 '18 at 14:12