15

I have a Spark dataframe with the following data (I use spark-csv to load the data in):

key,value
1,10
2,12
3,0
1,20

is there anything similar to spark RDD reduceByKey which can return a Spark DataFrame as: (basically, summing up for the same key values)

key,value
1,30
2,12
3,0

(I can transform the data to RDD and do a reduceByKey operation, but is there a more Spark DataFrame API way to do this?)

zero323
  • 322,348
  • 103
  • 959
  • 935
Carson Pun
  • 1,742
  • 2
  • 13
  • 20

3 Answers3

25

If you don't care about column names you can use groupBy followed by sum:

df.groupBy($"key").sum("value")

otherwise it is better to replace sum with agg:

df.groupBy($"key").agg(sum($"value").alias("value"))

Finally you can use raw SQL:

df.registerTempTable("df")
sqlContext.sql("SELECT key, SUM(value) AS value FROM df GROUP BY key")

See also DataFrame / Dataset groupBy behaviour/optimization

zero323
  • 322,348
  • 103
  • 959
  • 935
  • 6
    In the RDD API, I use `reduceByKey` since `groupByKey` collects all values for a key into memory - if a key is associated to many values, a worker could run out of memory. Does `groupBy` have that limitation too? – jeffreyveon Aug 04 '16 at 04:02
  • 2
    @jeffreyveon http://stackoverflow.com/q/32902982/1560062 but a) there is more than one mechanism of actual groupBy in Spark b) if aggregate like operations it is still possible to get OOMs for different reasons. – zero323 Aug 04 '16 at 07:47
  • Could you comment further on "If you don't care about column names..." What happens to the column names in this case, exactly? – justanotherbrain Oct 03 '16 at 14:55
  • 2
    @justanotherbrain The cloumn name would be something in the lines of _c1, _c2 etc., instead of "value" - the case when you use "alias". – Sai Kiriti Badam Nov 16 '17 at 07:20
2

I think user goks missed out on some part in the code. Its not a tested code.

.map should have been used to convert the rdd to a pairRDD using .map(lambda x: (x,1)).reduceByKey. ....

reduceByKey is not available on a single value rdd or regular rdd but pairRDD.

Thx

Ans u man
  • 21
  • 3
0

How about this? I agree this still converts to rdd then to dataframe.

df.select('key','value').map(lambda x: x).reduceByKey(lambda a,b: a+b).toDF(['key','value'])
goks
  • 1,196
  • 3
  • 18
  • 37