3

I run the "join" operation on the Apache Spark and see that there is no weak scalability. It will be grateful if anyone can explain this.

I create two dataframes ("a", "b") and ("a", "c") and join the dataframes by the first column. I generate dataframe values for "one to one" join. Also, I use the same partitioner to avoid shuffle.

Number of rows in the dataframes - 1024 * 1024 * 16 * cores_total (cores_total - total number of cores on which program is launched). Column "a" consist of random Int values, all values of the "b" column equal to 1, all values of the "c" column equal to 2.

Theoretically, with the increase of the data size and cores by 64 times, the execution time should remain the same, but the execution time slightly grows. I obtain the following execution times:

enter image description here

Apache Spark version - 2.1.0. We use 8 cluster nodes, equipped with 1 Gbit Ethernet, each node has 2x Intel Xeon E5-2630, 64 GB RAM.

/* join perf */
import scala.io.Source
import scala.math._
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.control.Breaks._
import scala.collection.mutable._
import org.apache.spark.rdd._
import org.apache.spark.sql._
import scala.util.Random
import org.apache.spark.util.SizeEstimator
import org.apache.spark.HashPartitioner

object joinPerf {

    def get_array(n: Int): Array[Int] = {
        var res = Array[Int]()
        for (x <- 1 to n) {
            res :+= Random.nextInt
        }

        return res
    }

    def main(args: Array[String]) {
        val start_time = System.nanoTime
        val conf = new SparkConf().setAppName("joinPerf")
        val sc = new SparkContext(conf)
        val cores_total = sc.getConf.get("spark.cores.max").toInt
        val partitions_total = sc.getConf.get("spark.default.parallelism").toInt
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext._
        import sqlContext.implicits._
        println("start")
        val elems_total = 1024 * 1024 * 16 * cores_total
        val start_cnt = 1024 * 1024
        Random.setSeed(785354)

        var vals = Vector[Int]()
        for (x <- 1 to start_cnt) {
            vals :+= Random.nextInt
        }

        var test_rdd = sc.parallelize(vals)
        println(test_rdd.count)
        test_rdd = test_rdd.flatMap(x => get_array(elems_total / start_cnt)).distinct

        println("test_rdd count = " + test_rdd.count)
        println("partitions count = " + test_rdd.getNumPartitions)

        var test_rdd1 = test_rdd.map(x => (x, 1)).toDF("a", "b").repartition(partitions_total, $"a").cache
        var test_rdd2 = test_rdd.map(x => (x, 2)).toDF("a", "c").repartition(partitions_total, $"a").cache

        println("test_rdd1 count = " + test_rdd1.count)
        println("test_rdd2 count = " + test_rdd2.count)

        var start_test_time = System.nanoTime
        var test_res = test_rdd1.join(test_rdd2, test_rdd1("a") === test_rdd2("a"))
        println(test_res.count)
        print("join time = ")
        println((System.nanoTime - start_test_time) / 1e9d + " sec. ")

        print("all time = ")
        println((System.nanoTime - start_time) / 1e9d + " sec. ")
        sc.stop()
    }
}

config parameters:

spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max  1024
spark.kryo.unsafe                true
spark.kryo.referenceTracking     false
spark.driver.memory              22g
spark.executor.memory            22g
spark.driver.maxResultSize       22g
spark.rpc.message.maxSize        2047
spark.memory.fraction            0.8
spark.memory.storageFraction     0.5
spark.executor.extraJavaOptions  "-XX:+UseParallelGC"

Partitions per core - 4.

Example of launching program:

./bin/spark-submit --class "joinPerf" --conf spark.executor.cores=8 --conf spark.cores.max=64 --conf spark.default.parallelism=256 ./joinPerf.jar

1 Answers1

1

Theoretically, with the increase of the data size and cores by 64 times, the execution time should remain the same, but the execution time slightly grows

It shouldn't. While one could expect linear scalability, assuming no IO bottlenecks, when performing strictly local operations on uniformly distributed data, this is not longer the case, when transformations require data exchange (RDD shuffles, Dataset Exchange). Among wide transformations, joins belong to most expensive category (next groupByKey-like operations), due their non-reducing nature, and usage of large, local, supporting collections.

Shuffles not only have higher than linear complexity (at least O(N log N) for sorting-based methods), but also can induce non-uniform distribution of data, and require significant disk and network IO.

This is even more severe in case of your code, which shuffles data twice - once to repartition RDDs and once to join Datasets (HashPartitioner for RDDs is not compatible with Dataset partitioning).

Finally increasing cluster size, has its own performance impact, related to increased communication and synchronization overhead and decreased data locality.

Overall you'll rarely see truly linear scalability, and even if you do, you can expect slope to be < 1.

On a side note I wouldn't depend on cache - count idiom when working with Datasets. It is likely to be unreliable.

See also Spark: Inconsistent performance number in scaling number of cores

zero323
  • 322,348
  • 103
  • 959
  • 935