1

I have a Spark Data Frame with a single column and large number of rows (in billions). I am trying to calculate the sum of the values in each row using the code shown below. However, it is very slow. Is there an efficient way to calculate the sum?

val df = sc.parallelize(Array(1,3,5,6,7,10,30)).toDF("colA")
df.show()
df.agg(sum("colA")).first().get(0) //very slow 

Similar query was posted here: How to sum the values of one column of a dataframe in spark/scala The focus of this query is however about efficiency.

Anhata
  • 119
  • 3
  • 11
  • You could try to use the underlying rdd api: `df.rdd.map { case org.apache.spark.sql.Row(v: Int) => v }.sum`. There is no other way I can think of that might be faster. https://stackoverflow.com/questions/37032025/how-to-sum-the-values-of-one-column-of-a-dataframe-in-spark-scala answer says the rdd api is faster. – Steffen Schmitz May 25 '17 at 16:43
  • 1
    Possible duplicate of [How to sum the values of one column of a dataframe in spark/scala](https://stackoverflow.com/questions/37032025/how-to-sum-the-values-of-one-column-of-a-dataframe-in-spark-scala) – Steffen Schmitz May 25 '17 at 16:46
  • Thanks for pointing the duplicate. However, I would like to focus on efficiency in this query. I will update my question accordingly. – Anhata May 25 '17 at 16:52
  • The efficiency part is also discussed in the comments on the other thread. For a single column the rdd api is fastest, otherwise the dataframe aggregation. – Steffen Schmitz May 25 '17 at 16:58
  • I doubt rdd version would be faster at least in spark 2.0 and onwards. – Assaf Mendelson May 25 '17 at 17:23
  • 1
    @SteffenSchmitz What makes you think the RDD aggregation would be faster than with Dataset? – Jacek Laskowski May 25 '17 at 17:25
  • What's the Spark version? – Jacek Laskowski May 25 '17 at 17:26
  • 2
    @JacekLaskowski I would think that on a narrow DataFrame with little transformations the serialization and query optimization causes more overhead than performance gain over the RDD API. In this small use case the basic API should be faster. – Steffen Schmitz May 25 '17 at 17:33
  • I am using 1.6.2 – Anhata May 25 '17 at 17:35
  • 2
    @SteffenSchmitz Beware of DataFrame.rdd though - I've had weird runtime errors using it - for stability reasons I would recommend avoiding it, unless the gain is significant. Another important factor regarding speed is data partitioning and parallelism. If possible pre-split your data according to the number of tasks you can run in parallel on your cluster (or more, if RAM is tight) or use .partition(Int) to shuffle the data out. Although this will cost some time due to network overhead, you may gain some efficiency if you have high bandwidth interconnects in your cluster and skewed splits. – Rick Moritz May 25 '17 at 18:55
  • @RickMoritz Do you have an example when this occurs or any other reference? I don't see this issue somewhere in the Spark Jira. Just being curious here... – Steffen Schmitz May 25 '17 at 19:08
  • @SteffenSchmitz Not yet - but I ran into a reproducible issue the other day, where I had to call DataFrame.rdd twice, for the second time not to throw an Exception during run time (and the first being technically useless...). Since I managed to work around it, I didn't get around to making a Jira/ extracting a reproducible example yet. – Rick Moritz May 25 '17 at 19:26

0 Answers0