3

I am working on a cluster composed by 4 instances EC2 r3.2xlarge. I use spark 1.3.

val test = clt.rdd.groupBy { r: Row =>
  val clt = r.get(0)
  clt
}

clt is a DataFrame and it comes from a csv file of 8.5Go composed by 200million of lines.

On the Spark interface I can see that my groupBy run over 220 partitions and I can also see "Shuffle spill (memory)" is more than 4TB. VM options : -Xms80g -Xmx80g

My questions are :

  • Why is the spill memory so large?

  • How can I optimize this?

I already tried to clt.rdd.repartition(1200) and I get the same result but this time on repartition task (shuffle spill memory really large and query really slow).


EDIT

I found something "weird" :

I have a DataFrame name test which contains 5 columns.

This code run in 5/10mins :

 val test1 = test.rdd.map {
  row =>
    val a = row.get(0)
    val b = row.get(1)
    val c = row.get(2)
    val d = row.get(3)
    val e = row.get(5)
    (a, Array(a, b, c, d, e))
}.groupByKey

This code run in 3/5hours (and generate large amount of shuffle spill memory) :

val test1 = test.rdd.map {
  row =>
    val a = row.get(0)
    (a, row)
}.groupByKey

Any idea why?

GermainGum
  • 1,349
  • 3
  • 15
  • 40
  • 1
    Similar in content to this question: http://stackoverflow.com/questions/30797724/how-to-optimize-shuffle-spill-in-apache-spark-application – Alister Lee Jun 15 '15 at 23:16
  • Yes I saw your solution. I upvoted the question and your solution. My question adds some metrics details and also cover two bullets point of your solution. Now I am trying to upgrade spark to 1.4 to see if it changes something. – GermainGum Jun 16 '15 at 07:42
  • I think the questions you ask are the same ("how can I optimise this?"). I think an interesting new question would be: "once I've maximised my executor memory, cores and memoryFraction settings, how do I find the optimum number of partitions?" – Alister Lee Jun 16 '15 at 09:57
  • Indeed really interesting question, it should have its own post. In my specific case I optimize it by doing a small change, the result his the same but I don't understand why in one case it is really slow and in the other it is not. I updated my post. – GermainGum Jun 19 '15 at 09:01
  • 1
    In your weird stuff, if you're sure you've actually isolated the only difference down to the code you show, then it seems that there is a difference in the memory consumed to represent each row between whatever it was before, and the Array[T], because that's the only change. That seems unlikely though. – Alister Lee Jun 20 '15 at 05:36

0 Answers0