15

In pyspark, I have a variable length array of doubles for which I would like to find the mean. However, the average function requires a single numeric type.

Is there a way to find the average of an array without exploding the array out? I have several different arrays and I'd like to be able to do something like the following:

df.select(col("Segment.Points.trajectory_points.longitude"))

DataFrame[longitude: array]

df.select(avg(col("Segment.Points.trajectory_points.longitude"))).show()
org.apache.spark.sql.AnalysisException: cannot resolve
'avg(Segment.Points.trajectory_points.longitude)' due to data type
mismatch: function average requires numeric types, not
ArrayType(DoubleType,true);;

If I have 3 unique records with the following arrays, I'd like the mean of these values as the output. This would be 3 mean longitude values.

Input:

[Row(longitude=[-80.9, -82.9]),
 Row(longitude=[-82.92, -82.93, -82.94, -82.96, -82.92, -82.92]),
 Row(longitude=[-82.93, -82.93])]

Output:

-81.9,
-82.931,
-82.93

I am using spark version 2.1.3.


Explode Solution:

So I've got this working by exploding, but I was hoping to avoid this step. Here's what I did

from pyspark.sql.functions import col
import pyspark.sql.functions as F

longitude_exp = df.select(
    col("ID"), 
    F.posexplode("Segment.Points.trajectory_points.longitude").alias("pos", "longitude")
)

longitude_reduced = long_exp.groupBy("ID").agg(avg("longitude"))

This successfully took the mean. However, since I'll be doing this for several columns, I'll have to explode the same DF several different times. I'll keep working through it to find a cleaner way to do this.

pault
  • 41,343
  • 15
  • 107
  • 149
Aaron Faltesek
  • 319
  • 2
  • 11

2 Answers2

11

In your case, your options are use explode or a udf. As you've noted, explode is unnecessarily expensive. Thus, a udf is the way to go.

You can write your own function to take the mean of a list of numbers, or just piggy back off of numpy.mean. If you use numpy.mean, you'll have to cast the result to a float (because spark doesn't know how to handle numpy.float64s).

import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

array_mean = udf(lambda x: float(np.mean(x)), FloatType())
df.select(array_mean("longitude").alias("avg")).show()
#+---------+
#|      avg|
#+---------+
#|    -81.9|
#|-82.93166|
#|   -82.93|
#+---------+
pault
  • 41,343
  • 15
  • 107
  • 149
  • This worked great! Interestingly, I did get different results. However this turned out to be known behavior for numpy (https://stackoverflow.com/questions/17945295/numpy-to-weak-to-calculate-a-precise-mean-value). The results were the same up to the fourth decimal place, which is accurate enough for me to use for my use-case. Thanks! – Aaron Faltesek Apr 05 '19 at 21:14
7

In the recent Spark versions (2.4 or later) the most efficient solution is to use aggregate higher order function:

from pyspark.sql.functions import expr

query = """aggregate(
    `{col}`,
    CAST(0.0 AS double),
    (acc, x) -> acc + x,
    acc -> acc / size(`{col}`)
) AS  `avg_{col}`""".format(col="longitude")

df.selectExpr("*", query).show()
+--------------------+------------------+
|           longitude|     avg_longitude|
+--------------------+------------------+
|      [-80.9, -82.9]|             -81.9|
|[-82.92, -82.93, ...|-82.93166666666667|
|    [-82.93, -82.93]|            -82.93|
+--------------------+------------------+

See also Spark Scala row-wise average by handling null

user10938362
  • 3,991
  • 2
  • 12
  • 29
  • 2
    Since 3.1.1, its also available in the python API, no need to use `expr`: `F.aggregate('my_array', 0, lambda x, y: x+y, lambda acc: acc / F.size('my_array'))` – Ron Serruya Oct 03 '22 at 13:04
  • @RonSerruya's solution was close but threw a "data type mismatch" error. I had to use `F.aggregate('my_array', F.lit(0.0), lambda x, y: x + y, lambda acc: acc / F.size('my_array')` – Oliver Angelil May 23 '23 at 19:48