3

Good day to everyone. I will try to explain my problem, so you can understand me.

In several places I have found that it holds that Scala is faster than Python:

In addition, it is said that Scala is the most suitable programming language to run applications in Apache Spark:

https://www.dezyre.com/article/scala-vs-python-for-apache-spark/213

However, on this site another user (@Mrityunjay) asked a question similar to the one I am proposing here:

Spark performance for Scala vs Python

In this post the reply from @zero323 highlights the following:

  1. @zero323 shows the large differences in performance in the programs written in Scala with respect to those written in Python.
  2. @zero323 explains how the use of operations such as ReduceByKey can dramatically impact the performance of Spark applications.
  3. @zero323 replaces the ReduceByKey operation by the GroupByKey one, so he can improve the performance of the program proposed by @Mrityunjay.

In general, the reply explanation is exceptional and very similar execution times are achieved with the @zero323's modification between both Scala and Python.

Taking into account this information, I gave myself the task of writing a simple program that would allow me to explain a similar situation that is happening to me with my application, highlighting that my code in Scala is slower than the one written in Python. For this, I avoided the use of ReduceByKey operations and only used map operations.

I will try to do any super-complex operation to maximize the cluster occupancy (96 cores, 48 GB RAM) and achieve large latencies. To this end, the code generates a set of 1 million artificial data (for the sole purpose of calculating the execution time of the processing of 1 million data, no matter if they are replicated) that contain an identifier ID, a length-10 vector of DoubleS.

Due to my application is implemented using DataFrame, I made two programs in Scala, one using RDDs and another using DataFrame, with the intention of observing if the problem is the use of DataFrame. Likewise, an equivalent program was made in Python.

In general, an operation is applied to each RDD/DataFrame record whose result is placed in an additional field, resulting a new RDD/DataFrame containing the original fields and a new field with the result.

This is the code in Scala:

import org.apache.spark.sql.SparkSession
import scala.math.BigDecimal

object RDDvsDFMapComparison {
  def main(args: Array[String]) {

    val spark = SparkSession.builder().appName("Test").getOrCreate()
    val sc = spark.sparkContext
    import spark.implicits._

    val parts = 96
    val repl = 1000000
    val rep = 60000000

    val ary = (0 until 10).toArray
    val m = Array.ofDim[Int](repl, ary.length)
    for (i <- 0 until repl)
      m(i) = ary

    val t1_start = System.nanoTime()
    if (args(0).toInt == 0) {
      val a1 = sc.parallelize(m, parts)
      val b1 = a1.zipWithIndex().map(x => (x._2.toString, x._1)).toDF("Name", "Data")
      val c1 = b1.map { x =>
        val name = x.getString(0)
        val data = x.getSeq[Int](1).toArray
        var mean = 0.0
        for (i <- 0 until rep)
          mean += Math.exp(Math.log(data.sum) / Math.log(data.length))
        (name, data, mean)
      }.toDF("Name", "Data", "Mean")
      val d1 = c1.take(5)
      println(d1.deep.mkString(","))
    } else {
      val a1 = sc.parallelize(m, parts)
      val b1 = a1.zipWithIndex().map(x => (x._2.toString, x._1))
      val c1 = b1.map { x =>
        val name = x._1
        val data = x._2
        var mean = 0.0
        for (i <- 0 until rep)
          mean += Math.exp(Math.log(data.sum) / Math.log(data.length))
        (name, data, mean)
      }
      val d1 = c1.take(5)
      println(d1.deep.mkString(","))
    }
    val t1_end = System.nanoTime()
    val t1 = t1_end - t1_start
    println("Map operation elapses: " + BigDecimal(t1.toDouble / 1000000000).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble.toString + " seconds.")
  }
}

This is the code in Python (much simpler):

#!/usr/bin/python
# -*- coding: latin-1 -*-

import sys
import time
import math
from pyspark import SparkContext, SparkConf

def myop(key, value):
  s = 0.0
  for j in range(r):
    s += math.exp(math.log(sum(value)) / math.log(float(len(value))))
  return (key, value, s)

if __name__ == "__main__":
  conf = SparkConf().setAppName("rddvsdfmapcomparison")
  sc = SparkContext(conf=conf)
  parts = 96
  repl = 1000000
  r = 60000000
  ary = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  m = []
  for i in range(repl): m.append(ary)
  start = time.time()
  a2 = sc.parallelize(m, parts)
  b2 = a2.zipWithIndex().map(lambda (value, key): (key, value))
  c2 = b2.map(lambda (key, value): myop(key, value))
  c2.count
  d2 = c2.take(5)
  print '[%s]' % ', '.join(map(str, d2))
  end = time.time()
  print 'Elapsed time is', round(end - start, 2), 'seconds'
  sc.stop()

The results are very clear. The program implemented in Python is faster than any implemented in Scala, either using RDDs or DataFrame. It could also be observed that the program in RDD is slightly faster than the program in DataFrame, which is consistent due to the use of decoders that extract the data type of each field of a DataFrame record.

The question is, what am I doing wrong? Is not Scala code faster than Python? Could someone explain to me what I am doing wrong in my code? The response from @zero323 is very good and illustrative, but I can not understand how a simple code like this can be slower in Scala than in Python.

Thank you very much for taking the time to read my question.

Vitrion
  • 405
  • 5
  • 14
  • 1
    Well... Spark is written in Scala and Python just provides you an API. Which means that the problem lies with the way you are writing your program. The thing is that Scala (somehow and for some strange reasons) hides a lot of performance traps for you to jump in. – sarveshseri Oct 09 '18 at 05:59
  • 1
    Can you explain what your `myop` function is doing? It looks rather odd because it doesn't use the loop value in the body of the loop. – Tim Oct 09 '18 at 07:15
  • 1
    Your Python code is so much faster because you never evaluate it `myop` calls. `c2.count` is just a method getter. If you want to evaluate the code you should call it: `c2.count()`. I am voting to close this as not reproducible, as your Python code is in fact slower, however the difference can be partially attributed to the unfair advantage you give Scala version. It should be perfectly possible to optimize Python code here, to get comparable times if you remove it, but that's a different problem. – zero323 Oct 09 '18 at 12:58
  • Actually, what I do is make fictitious processing, to make the thread take more time to process a task. It's true that it seems weird what I'm trying to do. Maybe I could have calculated a Fourier transform or a maybe a prediction by using a Machine Learning method. Thanks for your comments. – Vitrion Oct 09 '18 at 13:38
  • Thank you very much for your comments. I made a mistake. Actually, I updated the answer. The count() method should not be there. Sorry – Vitrion Oct 09 '18 at 13:52
  • 1
    If you remove the `count` from all calls, none of the examples will evaluate beyond small sample of data, so time should be negligible in Python and Scala alike. – zero323 Oct 09 '18 at 14:36

1 Answers1

7

Try this implementation in Scala. It is faster:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions._

val spark = SparkSession.builder().appName("Test").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._

val parts = 96
val repl = 1000000
val rep = 20000

val m = Vector.tabulate(repl, 10)((_,i) => i)

val myop = udf( (value: Seq[Int]) =>
  (0 until rep).foldLeft(0.0) {(acc,_)=>
    acc + Math.exp(Math.log(value.sum) / Math.log(value.length))
  }
)

val c1 = sc.parallelize(m, parts)
  .toDF("Data")
  .withColumn("Name",monotonically_increasing_id())
  .withColumn("Mean",myop('Data))

c1.count()
val d1 = c1.take(5)
println(d1.deep.mkString(","))

It could be even cleaner I think if I understood what function myop is actually doing.

Edit:

As @user6910411 mentioned in the comment, this implementation is faster only because it's doing exactly the same as Python's code (skipping most of the computation). Original Scala and Python implementations provided in the question are not equal.

zero323
  • 322,348
  • 103
  • 959
  • 935
Bogdan Vakulenko
  • 3,380
  • 1
  • 10
  • 25
  • 1
    How about using `Vector.tabulate(repl, 10)((_,i) => i)` to create `m`? It should be more efficient because the shape of the result is known in advance. – Tim Oct 09 '18 at 07:35
  • 1
    "_It is faster_" primarily because it doesn't do the same thing as the OP's code. In fact, if you structure your code like this, `myop` is invoked exactly 5 times. For `count` `myopp` applications will be simply discarded from the execution plan. Change `count` to `c1.foreach(_ => ())` (or you want to squeeze as much as possible `c1.queryExecution.toRdd.foreach(_ => ())`) and measure the time to see what I mean. – zero323 Oct 09 '18 at 12:24
  • Actually I used take(5) to start the transformation using the myop () function. Note that the time I am measuring does not include the creation of the "m" array, but does include the time it takes to: 1. create the RDD/DF, 2. perform myop (this function performs fictitious processing and its purpose is to increase the execution time of each task) with each record of the RDD/DF 3. Take only 5 elements of the RDD/DF and print them, using take(5). What you want to say then is that the use of df's can help improve the performance of the sequential tasks performed by each thread? – Vitrion Oct 09 '18 at 14:12
  • @Vitrion Aside of the possible improvements of individual methods suggested here, your code (originally only Python code, now both Scala and Python) just doesn't fully evaluate the transformation. In other words the timings you get are meaningless and don't measure what you want. Note that `count()` in Bogdan's answer, and `count()` in your original Scala snippets are not equivalent. Here execution plan can be optimized, in case of "strongly" typed `Dataset` or RDD cannot. Overall the premise of your question is wrong, and if you correctly measure the time, Scala is actually somewhat faster. – zero323 Oct 09 '18 at 14:43