0

I try to apply join for two datasets coming from two big text files. Both text files contains two columns as below:

*col1*     *document*
abc            1
aab            1
...           ...
ccd            2
abc            2
...           ...

I join these two files based on their first columns and try to find how many common col1 values there are for documents. Both text files have a size of 10 gb. When I run my script spark creates 6 stages each of which having 287 partitions. Of those 6 stages are 4 distinct, one foreach and one map. Everything goes well until mapping stage which is 5th stage. On that stage spark stops processing partitions instead it spills on the disk and after ten thousand times spilling it gives an error related to no enough disk space.

I have 4 cores and 8 gb ram. I gave all the memory with -Xmx8g. Also i tried set("spark.shuffle.spill", "true").

My script:

{
...
val conf = new SparkConf().setAppName("ngram_app").setMaster("local[4]").set("spark.shuffle.spill", "false")
    val sc = new SparkContext(conf)


val emp = sc.textFile("...doc1.txt").map { line => val parts = line.split("\t")
     ((parts(5)),parts(0))
    }

    val emp_new = sc.textFile("...doc2.txt").map { line => val parts = line.split("\t")
      ((parts(3)),parts(1))
    }


val finalemp = emp_new.join(emp).
        map { case((nk1) ,((parts1), (val1))) => (parts1 + "-" + val1, 1)}.reduceByKey((a, b) => a + b)

    finalemp.foreach(println)

}

What should I do to avoid that much spilling?

mlee_jordan
  • 772
  • 4
  • 18
  • 50

1 Answers1

2

It looks like you need to change the memory setting for Spark. If you use the spark-submit script, you simply add -executor-memory 8G to your command. Setting -Xmx8g affects the JVM, but not Spark (which I believe defaults to 256MB).

Note, that the rule of thumb says that you should not assign more than 75% of available memory to the Spark job.

Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
  • Thanks Glennie. I changed the setting as val conf = new SparkConf().setAppName("abdulhay").setMaster("local[4]").set("spark.shuffle.spill", "true") .set("spark.shuffle.memoryFraction", "0.6").set("spark.storage.memoryFraction", "0.2").set("spark.executor.memory","6g") and -Xmx6g. As a result on the mapping stage it is not spilling permanently at least it finishes tasks. However, before finishing a task spilling takes averagely 10 min. And I have 1000 partitions. Does it sound normal with such a computer with the features I gave above otherwise what might be the bottleneck? – mlee_jordan Sep 15 '15 at 09:17
  • 1
    You may want to take a look at this question and answer http://stackoverflow.com/questions/30797724/how-to-optimize-shuffle-spill-in-apache-spark-application which offers hints on how to avoid or minimize spill. On another note, I would do a `mapValues` instead of a `map` after your `join`. When you do a `mapValues` Spark knows that the keys are not altered and still partitioned in the same way. The following `reduceByKey` will thus run faster :) – Glennie Helles Sindholt Sep 15 '15 at 09:40