2

I have a PySpark DataFrame named DF with (K,V) pairs. I would like to apply multiple functions with ReduceByKey. For example, I have following three simple functions:

def sumFunc(a,b): return a+b

def maxFunc(a,b): return max(a,b)

def minFunc(a,b): return min(a,b)

When I apply only one function, e.g,, following three work:

DF.reduceByKey(sumFunc)  #works
DF.reduceByKey(maxFunc)  #works
DF.reduceByKey(minFunc)  #works

But, when I apply more than one function, it does not work, e.g., followings do not work.

DF.reduceByKey(sumFunc, maxfunc, minFunc) #it does not work
DF.reduceByKey(sumFunc, maxfunc) #it does not work
DF.reduceByKey(maxfunc, minFunc) #it does not work
DF.reduceByKey(sumFunc, minFunc) #it does not work

I do not want to use groupByKey because it slows down the computation.

zero323
  • 322,348
  • 103
  • 959
  • 935
Sohel Khan
  • 661
  • 2
  • 8
  • 14

1 Answers1

1

If input is a DataFrame just use agg:

import pyspark.sql.functions as sqlf

df = sc.parallelize([
   ("foo", 1.0), ("foo", 2.5), ("bar", -1.0), ("bar", 99.0)
]).toDF(["k", "v"])

df.groupBy("k").agg(sqlf.min("v"), sqlf.max("v"), sqlf.sum("v")).show()

## +---+------+------+------+
## |  k|min(v)|max(v)|sum(v)|
## +---+------+------+------+
## |bar|  -1.0|  99.0|  98.0|
## |foo|   1.0|   2.5|   3.5|
## +---+------+------+------+

With RDDs you can use statcounter:

from pyspark.statcounter import StatCounter

rdd = df.rdd
stats = rdd.aggregateByKey(
    StatCounter(), StatCounter.merge, StatCounter.mergeStats
).mapValues(lambda s: (s.min(), s.max(), s.sum()))

stats.collect()
## [('bar', (-1.0, 99.0, 98.0)), ('foo', (1.0, 2.5, 3.5))]

Using your functions you could do something like this:

def apply(x, y, funs=[minFunc, maxFunc, sumFunc]):
    return [f(x_, y_) for f, x_, y_ in zip(*(funs, x, y))]

rdd.combineByKey(lambda x: (x, x, x), apply, apply).collect()
## [('bar', [-1.0, 99.0, 98.0]), ('foo', [1.0, 2.5, 3.5])]
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Can you also use the RDD method for multiple StatCounter instances at once? For instance if you want min/max for different columns in the same aggregation? – Matthias Jul 08 '16 at 08:04
  • I tried to use your StatCounter example. But when I try it on a key-value RDD with string-float, then I get this error: TypeError: unbound method merge() must be called with NoneType instance as first argument (got StatCounter instance instead) – Matthias Jul 18 '16 at 13:28
  • @Matthias I don't use Python 2 anymore. If you use 2.x you'll have to use functions which use specific instances you receive in seq-op and merge-op. – zero323 Jul 18 '16 at 14:05
  • thanks for the hint. After search for weeks, I finally found a really good [explanation here](http://www.learnbymarketing.com/618/pyspark-rdd-basics-examples/). – Matthias Jul 19 '16 at 07:01