4

Let's say I have the following DataFrame:

[Row(user='bob', values=[0.5, 0.3, 0.2]),
Row(user='bob', values=[0.1, 0.3, 0.6]),
Row(user='bob', values=[0.8, 0.1, 0.1])]

I would like to groupBy user and do something like avg(values) where the average is taken over each index of the array values like this:

[Row(user='bob', avgerages=[0.466667, 0.233333, 0.3])]

How can I do this in PySpark?

zero323
  • 322,348
  • 103
  • 959
  • 935
Evan Zamir
  • 8,059
  • 14
  • 56
  • 83

1 Answers1

14

You can expand array and compute average for each index.

Python

from pyspark.sql.functions import array, avg, col

n = len(df.select("values").first()[0])

df.groupBy("user").agg(
    array(*[avg(col("values")[i]) for i in range(n)]).alias("averages")
)

Scala

import spark.implicits._
import org.apache.spark.functions.{avg, size}

val df = Seq(
  ("bob", Seq(0.5, 0.3, 0.2)),
  ("bob", Seq(0.1, 0.3, 0.6))
).toDF("user", "values")

val n = df.select(size($"values")).as[Int].first
val values = (0 to n).map(i => $"values"(i))

df.select($"user" +: values: _*).groupBy($"user").avg()
zero323
  • 322,348
  • 103
  • 959
  • 935
  • What does the * do in this case? Also, is there a way like in Pandas where I could pass each group to a user-defined function and do the operation in there? Thanks. – Evan Zamir Aug 17 '16 at 01:25
  • `*` is a standard Python argument unpacking. No, Python doesn't support UDAFs. You can use RDDs directly or define JVM one. – zero323 Aug 17 '16 at 01:34
  • Thanks! I think RDD makes sense here. – Evan Zamir Aug 17 '16 at 16:07
  • If you want to give RDD a try you can use a subset (`compute_stats` without collect) of [this answer](http://stackoverflow.com/a/36361546/1560062). – zero323 Aug 17 '16 at 17:36
  • @Gevorg Here you are. You may also find interesting http://stackoverflow.com/q/41731865/1560062 – zero323 Apr 25 '17 at 23:31
  • @zero323 would it be possible to use agg() and have the end result in a single column (of array type containing the averages)? – Marsellus Wallace Apr 26 '17 at 14:15
  • @Gevorg Yup, just wrap it with `o.a.s.sql.functions.array`. – zero323 Apr 26 '17 at 15:34
  • `n = len(df.select("values").first()[0])` how could this be done where the length is variable? – con Nov 22 '19 at 18:34
  • @con `n = len(df.select(max("values").)first()[0])`? Or maybe `df.select("user", posexplode("values")).groupBy("user", "pos").avg()`? – 10465355 Nov 23 '19 at 19:38
  • My aim is to use aggregated data in different columns, for which I am using: `AggregateDF = TimeWindowDF.groupBy('col1','col2').agg( f.array(*[f.avg(f.col("value.count").getItem(0))]).alias("avg_count"), f.array(*[f.avg(f.col("value.occ").getItem(0))]).alias("avg_occ"), f.array(*[f.avg(f.col("value.speed").getItem(0))]).alias("avg_speed"))` But the columns I am having are of datatype array, however I want them to be in type double. How that can be achieved? – Pardeep Naik Jan 30 '20 at 08:52