0

As far as I understand, this is the most efficient way to calculate average in Spark: Spark : Average of values instead of sum in reduceByKey using Scala.

My question is: if I use the high-level dataset with a groupby followed by Spark functions' avg(), will I get the same RDD under the hood? Can I trust Catalyst or I should use the low-level RDD? I mean, will writing low-level code yield better results than a dataset?

Example code:

employees
  .groupBy($"employee")
  .agg(
    avg($"salary").as("avg_salary")
  )

Versus:

employees
.mapValues(employee => (employee.salary, 1)) // map entry with a count of 1
.reduceByKey {
  case ((sumL, countL), (sumR, countR)) => 
    (sumL + sumR, countL + countR)
}
.mapValues { 
  case (sum , count) => sum / count 
}
Alon
  • 10,381
  • 23
  • 88
  • 152

3 Answers3

4

I don't see it a black-and-white question. In general, if you have a RDD, especially if it's a PairRDD, and need a result in RDD, it would make sense to settle with reduceByKey. On the other hand, given a DataFrame I would recommend going with groupBy/agg(avg).

A couple of things to consider:

Built-in optimization

While reduceByKey is relatively efficient compared to functions like groupByKey, it does induce stage boundaries since the operation requires repartitioning the data by keys. Depending on the RDD's partitions, the number of tasks in the derived stage may end up to be too small to take advantage of the available cpu cores, potentially resulting in a performance bottleneck. Such performance issue could be addressed, for instance, by manually assigning numPartition in reduceByKey:

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

But the point is that, to fully optimize RDD operations, one might need to put in some manual tweaking effort. In contrary, most operations for DataFrames are automatically optimized by the built-in Catalyst query optimizer.

Memory usage efficiency

Perhaps the other more significant factor to be looked at is related to memory usage for large dataset. When a RDD needs to be distributed across nodes or written to disk, Spark will serialize every row of data into objects, subject to costly Garbage Collection overhead. On the other hand, with knowledge of a DataFrame's schema, Spark doesn't need to serialize the data into objects. The Tungsten execution engine can leverage off-heap memory to store data in binary format for transformations, resulting in more efficient use of memory.

In conclusion, while there may be more knobs for tweaking using low-level code, that does not necessarily result in more performant code due to inadequate optimization, additional cost for serialization, etc.

Leo C
  • 22,006
  • 3
  • 26
  • 39
  • The gain given by the tungsten format in term of garbage collection is because the rows are represented as byte arrays, not because they are stored off-heap (btw, by default off-heap is disabled, need to activate it with `spark.memory.offHeap.enabled`) – bonnal-enzo Jan 07 '20 at 15:27
  • @EnzoBnl, maybe I wasn't clear enough. Have slightly elaborated the answer. – Leo C Jan 07 '20 at 17:50
1

We can conclude this from the plan generated by Spark.

This is the plan for DataFrame syntax-

    val employees = spark.createDataFrame(Seq(("E1",100.0), ("E2",200.0),("E3",300.0))).toDF("employee","salary")

    employees
      .groupBy($"employee")
      .agg(
        avg($"salary").as("avg_salary")
      ).explain(true)

Plan -

== Parsed Logical Plan ==
'Aggregate ['employee], [unresolvedalias('employee, None), avg('salary) AS avg_salary#11]
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
   +- LocalRelation [_1#0, _2#1]

== Analyzed Logical Plan ==
employee: string, avg_salary: double
Aggregate [employee#4], [employee#4, avg(salary#5) AS avg_salary#11]
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
   +- LocalRelation [_1#0, _2#1]

== Optimized Logical Plan ==
Aggregate [employee#4], [employee#4, avg(salary#5) AS avg_salary#11]
+- LocalRelation [employee#4, salary#5]

== Physical Plan ==
*(2) HashAggregate(keys=[employee#4], functions=[avg(salary#5)], output=[employee#4, avg_salary#11])
+- Exchange hashpartitioning(employee#4, 10)
   +- *(1) HashAggregate(keys=[employee#4], functions=[partial_avg(salary#5)], output=[employee#4, sum#17, count#18L])
      +- LocalTableScan [employee#4, salary#5]

As the plan suggests first "HashAggregate" happened with partial average then "exchange hashpartitioning" happened for full average. The conclusion is that catalyst optimized the DataFrame operation as if we programmed with "reduceByKey" syntax. So we needn't take the burden of writing low level code.

Here is how RDD code and plan looks like.

    employees
      .map(employee => ("key",(employee.getAs[Double]("salary"), 1))) // map entry with a count of 1
      .rdd.reduceByKey {
      case ((sumL, countL), (sumR, countR)) =>
        (sumL + sumR, countL + countR)
    }
    .mapValues {
      case (sum , count) => sum / count
    }.toDF().explain(true)

Plan -

== Parsed Logical Plan ==
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#30, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#31]
+- ExternalRDD [obj#29]

== Analyzed Logical Plan ==
_1: string, _2: double
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#30, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#31]
+- ExternalRDD [obj#29]

== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#30, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#31]
+- ExternalRDD [obj#29]

== Physical Plan ==
*(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#30, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#31]
+- Scan[obj#29]

The plan is optimized and also involves serialization of data into objects which means extra pressure of memory.

Conclusion

I would use daraframe syntax for its simplicity and possibly better performance.

Salim
  • 2,046
  • 12
  • 13
-5

Debug at println("done") , Go to http://localhost:4040/stages/ ,You will get the result.

val spark = SparkSession
  .builder()
  .master("local[*]")
  .appName("example")
  .getOrCreate()

val employees = spark.createDataFrame(Seq(("employee1",1000),("employee2",2000),("employee3",1500))).toDF("employee","salary")
import spark.implicits._
import org.apache.spark.sql.functions._
// Spark functions
employees
  .groupBy("employee")
  .agg(
    avg($"salary").as("avg_salary")
  ).show()
// your low-level code

println("done")
does
  • 107
  • 10