Good day to everyone. I will try to explain my problem, so you can understand me.
In several places I have found that it holds that Scala is faster than Python:
- https://www.quora.com/Which-one-is-faster-Scala-or-Python
- https://github.com/archivesunleashed/aut/issues/215
In addition, it is said that Scala is the most suitable programming language to run applications in Apache Spark:
https://www.dezyre.com/article/scala-vs-python-for-apache-spark/213
However, on this site another user (@Mrityunjay) asked a question similar to the one I am proposing here:
Spark performance for Scala vs Python
In this post the reply from @zero323 highlights the following:
- @zero323 shows the large differences in performance in the programs written in Scala with respect to those written in Python.
- @zero323 explains how the use of operations such as ReduceByKey can dramatically impact the performance of Spark applications.
- @zero323 replaces the ReduceByKey operation by the GroupByKey one, so he can improve the performance of the program proposed by @Mrityunjay.
In general, the reply explanation is exceptional and very similar execution times are achieved with the @zero323's modification between both Scala and Python.
Taking into account this information, I gave myself the task of writing a simple program that would allow me to explain a similar situation that is happening to me with my application, highlighting that my code in Scala is slower than the one written in Python. For this, I avoided the use of ReduceByKey operations and only used map operations.
I will try to do any super-complex operation to maximize the cluster occupancy (96 cores, 48 GB RAM) and achieve large latencies. To this end, the code generates a set of 1 million artificial data (for the sole purpose of calculating the execution time of the processing of 1 million data, no matter if they are replicated) that contain an identifier ID, a length-10 vector of DoubleS.
Due to my application is implemented using DataFrame, I made two programs in Scala, one using RDDs and another using DataFrame, with the intention of observing if the problem is the use of DataFrame. Likewise, an equivalent program was made in Python.
In general, an operation is applied to each RDD/DataFrame record whose result is placed in an additional field, resulting a new RDD/DataFrame containing the original fields and a new field with the result.
This is the code in Scala:
import org.apache.spark.sql.SparkSession
import scala.math.BigDecimal
object RDDvsDFMapComparison {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("Test").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val parts = 96
val repl = 1000000
val rep = 60000000
val ary = (0 until 10).toArray
val m = Array.ofDim[Int](repl, ary.length)
for (i <- 0 until repl)
m(i) = ary
val t1_start = System.nanoTime()
if (args(0).toInt == 0) {
val a1 = sc.parallelize(m, parts)
val b1 = a1.zipWithIndex().map(x => (x._2.toString, x._1)).toDF("Name", "Data")
val c1 = b1.map { x =>
val name = x.getString(0)
val data = x.getSeq[Int](1).toArray
var mean = 0.0
for (i <- 0 until rep)
mean += Math.exp(Math.log(data.sum) / Math.log(data.length))
(name, data, mean)
}.toDF("Name", "Data", "Mean")
val d1 = c1.take(5)
println(d1.deep.mkString(","))
} else {
val a1 = sc.parallelize(m, parts)
val b1 = a1.zipWithIndex().map(x => (x._2.toString, x._1))
val c1 = b1.map { x =>
val name = x._1
val data = x._2
var mean = 0.0
for (i <- 0 until rep)
mean += Math.exp(Math.log(data.sum) / Math.log(data.length))
(name, data, mean)
}
val d1 = c1.take(5)
println(d1.deep.mkString(","))
}
val t1_end = System.nanoTime()
val t1 = t1_end - t1_start
println("Map operation elapses: " + BigDecimal(t1.toDouble / 1000000000).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble.toString + " seconds.")
}
}
This is the code in Python (much simpler):
#!/usr/bin/python
# -*- coding: latin-1 -*-
import sys
import time
import math
from pyspark import SparkContext, SparkConf
def myop(key, value):
s = 0.0
for j in range(r):
s += math.exp(math.log(sum(value)) / math.log(float(len(value))))
return (key, value, s)
if __name__ == "__main__":
conf = SparkConf().setAppName("rddvsdfmapcomparison")
sc = SparkContext(conf=conf)
parts = 96
repl = 1000000
r = 60000000
ary = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
m = []
for i in range(repl): m.append(ary)
start = time.time()
a2 = sc.parallelize(m, parts)
b2 = a2.zipWithIndex().map(lambda (value, key): (key, value))
c2 = b2.map(lambda (key, value): myop(key, value))
c2.count
d2 = c2.take(5)
print '[%s]' % ', '.join(map(str, d2))
end = time.time()
print 'Elapsed time is', round(end - start, 2), 'seconds'
sc.stop()
The results are very clear. The program implemented in Python is faster than any implemented in Scala, either using RDDs or DataFrame. It could also be observed that the program in RDD is slightly faster than the program in DataFrame, which is consistent due to the use of decoders that extract the data type of each field of a DataFrame record.
The question is, what am I doing wrong? Is not Scala code faster than Python? Could someone explain to me what I am doing wrong in my code? The response from @zero323 is very good and illustrative, but I can not understand how a simple code like this can be slower in Scala than in Python.
Thank you very much for taking the time to read my question.