0

I have 40GB of csv files. After reading them, I need to perform a series of transformations. One of them is to explode a column. After that transformation I get the shuffle spill depicted below. I understand why that is the case. The explosion is based on a broadcast variable lookup which gives a very skewed set of results.

My question is - how can I mitigate the spill? I tried repartitioning before the explode function by tuning the spark.sql.shuffle.partitions configuration parameter in order to ensure that the shuffle partitions are of equal size but that didn't help.

Any suggestions or literature on the topic would be greatly appreciated!

Spill

zaxme
  • 1,065
  • 11
  • 29

1 Answers1

0

If you are working with RDDs you can use the built-in partitioners of Spark to customise the distribution of your data accross partitions. You can choose between a HashPartitioner and a RangePartitioner. Both work for discrete values and continuous values.

Example HashPartitioner:

import org.apache.spark.HashPartitioner

val rdd = df.rdd // convert DataFrame to low-level RDD
val keyedRDD = rdd.keyBy(...) // define your custom key
keyedRDD.partitionBy(new HashPartitioner(n))

Example Partitioner:

import org.apache.spark.Partitioner

class DomainParitioner extends Partitioner {
  def numPartitions = n
  def getPartition(key: Any): Int = {
    // your custome partition logic
  }
}

keyedRDD.partitionBy(new DomainPartitioner).map(_._1).glom().map(_.toSet.toSeq.length)
Michael Heil
  • 16,250
  • 3
  • 42
  • 77
  • Thanks @mike! I was successful with my computation even though I still get several TBs of memory spill. For the others reading this - it is not so straight forward to implement your own custom partitioner if you are coming from a DF. This answer (the pre Spark 1.6) helped me get it right - https://stackoverflow.com/a/32920122/1450817 – zaxme Apr 02 '20 at 16:10