0

It looks like median over a window function is not supported, but somehow error message is not explicitly saying that. Is there another way to calculate median over rolling window?

import pyspark # 3.4.1
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    [
        (4, 5),
        (3, 10),
        (2, 15),
        (1, 20)
    ],
    ('id1', 'v1')
)
df.createOrReplaceTempView("x")
spark.sql('select median(v1) over (order by id1 rows between 2 preceding and current row) as v1 from x').collect()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/jan/.local/lib/python3.8/site-packages/pyspark/sql/session.py", line 1440, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
  File "/home/jan/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/home/jan/.local/lib/python3.8/site-packages/pyspark/errors/exceptions/captured.py", line 175, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: Cannot specify order by or frame for 'median'.; line 1 pos 7;
Project [v1#4]
+- Project [v1#1L, id1#0L, v1#4, v1#4]
   +- Window [median(v1#1L) windowspecdefinition(id1#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) AS v1#4], [id1#0L ASC NULLS FIRST]
      +- Project [v1#1L, id1#0L]
         +- SubqueryAlias x
            +- View (`x`, [id1#0L,v1#1L])
               +- LogicalRDD [id1#0L, v1#1L], false

Removing order by id1, which is obviously not desirable, does not make any difference.

jangorecki
  • 16,384
  • 4
  • 79
  • 160
  • 1
    while `percentile_approx` should be the first choice, you could employ other methods as [here](https://stackoverflow.com/q/73865423/8279585) as well. – samkart Jul 25 '23 at 11:34

1 Answers1

1

Calculating median over a window in PySpark is not directly supported. The main reason for this is that the median requires sorting the data, and sorting is a non-parallelizable operation, making it inefficient to compute in a distributed environment such as Spark.

However, there are workarounds to achieve this. You could use native pyspark function percentile_approx with value 0.5, which is an approximate version of the median:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    [
        (4, 5),
        (3, 10),
        (2, 15),
        (1, 20)
    ],
    ('id1', 'v1')
)


window = Window.orderBy('id1').rowsBetween(-2, 0)

df = df.withColumn('median_approx', F.expr('percentile_approx(v1, 0.5)').over(window))

df.show()

This will calculate an approximate median over the window for each row. Note that the result is an approximation, but for many practical purposes, it should be close enough. According to the documentation of the function used there is an accuracy parameter that you can use according to your needs. Therefore, consider setting this parameter based on your specific requirements for accuracy and available resources.

Spark does not support window operations that require a total ordering or a complete scan of the window data, like medians or other rank-based statistics. Even by employing Vectorised/PANDAS_UDFs you will face the same issue.

mamonu
  • 392
  • 3
  • 19