20

If we have a Pandas data frame consisting of a column of categories and a column of values, we can remove the mean in each category by doing the following:

df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g))

As far as I understand, Spark dataframes do not directly offer this group-by/transform operation (I am using PySpark on Spark 1.5.0). So, what is the best way to implement this computation?

I have tried using a group-by/join as follows:

df2 = df.groupBy("Category").mean("Values")
df3 = df2.join(df)

But it is very slow since, as I understand, each category requires a full scan of the DataFrame.

I think (but have not verified) that I can speed this up a great deal if I collect the result of the group-by/mean into a dictionary, and then use that dictionary in a UDF as follows:

nameToMean = {...}
f = lambda category, value: value - nameToMean[category]
categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType())
df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value))

Is there an idiomatic way to express this type of operation without sacrificing performance?

zero323
  • 322,348
  • 103
  • 959
  • 935
Peter Lubans
  • 355
  • 2
  • 8

3 Answers3

12

I understand, each category requires a full scan of the DataFrame.

No it doesn't. DataFrame aggregations are performed using a logic similar to aggregateByKey. See DataFrame groupBy behaviour/optimization A slower part is join which requires sorting / shuffling. But it still doesn't require scan per group.

If this is an exact code you use it is slow because you don't provide a join expression. Because of that it simply performs a Cartesian product. So it is not only inefficient but also incorrect. You want something like this:

from pyspark.sql.functions import col

means = df.groupBy("Category").mean("Values").alias("means")
df.alias("df").join(means, col("df.Category") == col("means.Category"))

I think (but have not verified) that I can speed this up a great deal if I collect the result of the group-by/mean into a dictionary, and then use that dictionary in a UDF

It is possible although performance will vary on case by case basis. A problem with using Python UDFs is that it has to move data to and from Python. Still, it is definitely worth trying. You should consider using a broadcast variable for nameToMean though.

Is there an idiomatic way to express this type of operation without sacrificing performance?

In PySpark 1.6 you can use broadcast function:

df.alias("df").join(
    broadcast(means), col("df.Category") == col("means.Category"))

but it is not available in <= 1.5.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks for the reply. I wasn't aware of the Cartesian product behavior in df.join(); I had assumed incorrectly that the default behavior was to join on any columns that shares the same name. Adding an explicit equality test with an alias for the category column column from the table of means sped things up massively. – Peter Lubans Dec 25 '15 at 23:08
  • You're welcome. It is always useful to check execution extended execution plan (`df.explain(extended=True)`). The most common issues ignoring configuration are related to Cartesian products and even if you provide a join expression it may not be optimized. – zero323 Dec 26 '15 at 05:03
12

You can use Window to do this

i.e.

import pyspark.sql.functions as F
from pyspark.sql.window import Window

window_var = Window().partitionBy('Categroy')
df = df.withColumn('DemeanedValues', F.col('Values') - F.mean('Values').over(window_var))
Azuuu
  • 853
  • 8
  • 17
4

Actually, there is an idiomatic way to do this in Spark, using the Hive OVER expression.

i.e.

df.registerTempTable('df')
with_category_means = sqlContext.sql('select *, mean(Values) OVER (PARTITION BY Category) as category_mean from df')

Under the hood, this is using a window function. I'm not sure if this is faster than your solution, though

Jason P
  • 318
  • 1
  • 7