I run the "join" operation on the Apache Spark and see that there is no weak scalability. It will be grateful if anyone can explain this.
I create two dataframes ("a", "b") and ("a", "c") and join the dataframes by the first column. I generate dataframe values for "one to one" join. Also, I use the same partitioner to avoid shuffle.
Number of rows in the dataframes - 1024 * 1024 * 16 * cores_total (cores_total - total number of cores on which program is launched). Column "a" consist of random Int values, all values of the "b" column equal to 1, all values of the "c" column equal to 2.
Theoretically, with the increase of the data size and cores by 64 times, the execution time should remain the same, but the execution time slightly grows. I obtain the following execution times:
Apache Spark version - 2.1.0. We use 8 cluster nodes, equipped with 1 Gbit Ethernet, each node has 2x Intel Xeon E5-2630, 64 GB RAM.
/* join perf */
import scala.io.Source
import scala.math._
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.control.Breaks._
import scala.collection.mutable._
import org.apache.spark.rdd._
import org.apache.spark.sql._
import scala.util.Random
import org.apache.spark.util.SizeEstimator
import org.apache.spark.HashPartitioner
object joinPerf {
def get_array(n: Int): Array[Int] = {
var res = Array[Int]()
for (x <- 1 to n) {
res :+= Random.nextInt
}
return res
}
def main(args: Array[String]) {
val start_time = System.nanoTime
val conf = new SparkConf().setAppName("joinPerf")
val sc = new SparkContext(conf)
val cores_total = sc.getConf.get("spark.cores.max").toInt
val partitions_total = sc.getConf.get("spark.default.parallelism").toInt
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
import sqlContext.implicits._
println("start")
val elems_total = 1024 * 1024 * 16 * cores_total
val start_cnt = 1024 * 1024
Random.setSeed(785354)
var vals = Vector[Int]()
for (x <- 1 to start_cnt) {
vals :+= Random.nextInt
}
var test_rdd = sc.parallelize(vals)
println(test_rdd.count)
test_rdd = test_rdd.flatMap(x => get_array(elems_total / start_cnt)).distinct
println("test_rdd count = " + test_rdd.count)
println("partitions count = " + test_rdd.getNumPartitions)
var test_rdd1 = test_rdd.map(x => (x, 1)).toDF("a", "b").repartition(partitions_total, $"a").cache
var test_rdd2 = test_rdd.map(x => (x, 2)).toDF("a", "c").repartition(partitions_total, $"a").cache
println("test_rdd1 count = " + test_rdd1.count)
println("test_rdd2 count = " + test_rdd2.count)
var start_test_time = System.nanoTime
var test_res = test_rdd1.join(test_rdd2, test_rdd1("a") === test_rdd2("a"))
println(test_res.count)
print("join time = ")
println((System.nanoTime - start_test_time) / 1e9d + " sec. ")
print("all time = ")
println((System.nanoTime - start_time) / 1e9d + " sec. ")
sc.stop()
}
}
config parameters:
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 1024
spark.kryo.unsafe true
spark.kryo.referenceTracking false
spark.driver.memory 22g
spark.executor.memory 22g
spark.driver.maxResultSize 22g
spark.rpc.message.maxSize 2047
spark.memory.fraction 0.8
spark.memory.storageFraction 0.5
spark.executor.extraJavaOptions "-XX:+UseParallelGC"
Partitions per core - 4.
Example of launching program:
./bin/spark-submit --class "joinPerf" --conf spark.executor.cores=8 --conf spark.cores.max=64 --conf spark.default.parallelism=256 ./joinPerf.jar