2

I'm trying to implement the "moving median" function as a window function to use it in Apache Spark SQL.

I'm trying to implement it as a UDAF in Scala. The version of Spark is 1.6.1.

I tried 2 ways of calling my UDAF ("median"):

1) As an SQL query:

val timeSeries = ... // get a DataFrame
...
timeSeries.registerTempTable("time_series")
timeSeries.sqlContext.udf.register("median", new MedianUDAF)
val timeSeriesWithMovingAverage = timeSeries.sqlContext.sql(s"select *, median(value_column) over (partition by metrics_name order by time_column) from time_series")

The result was:

Failure(org.apache.spark.sql.AnalysisException: Couldn't find window function median;)

2) As a DataFrame API call:

val timeSeriesWithMovingAverage = timeSeries.withColumn("movingAvg", medianFunction(timeSeries("value_column")).over(windowSpec))

The result was:

Failure(java.lang.UnsupportedOperationException: MedianUDAF(value#16) is not supported in a window operation.)

Is there any way to use UDAFs as window functions? For example, to calculate moving median (not moving average but median).

user3791111
  • 1,469
  • 1
  • 15
  • 20
  • In Spark 2.1+, to find (moving) median we can use functions `percentile` and `percentile_approx`. The following answer uses `expr` in PySpark to calculate moving median as a window function - you should be able to work your way into Scala: https://stackoverflow.com/questions/44061553/how-to-calculate-moving-median-in-dataframe#71545401 – ZygD Mar 20 '22 at 09:40

1 Answers1

1

Unfortunately, I believe you cannot use UDFs or UDAFs within windowing functions. The windowing functions are not native to Spark -- they use Hive, which precludes using Spark-defined UDFs and UDAFs.

In theory, I suppose you could create your UDAF directly in Hive, and then call it from Spark. But that's not something I have tried before -- it just seems like it must be possible.

David Griffin
  • 13,677
  • 5
  • 47
  • 65