5

I have a dataframe of the following form:

import scala.util.Random
val localData = (1 to 100).map(i => (i,Seq.fill(Math.abs(Random.nextGaussian()*100).toInt)(Random.nextDouble)))
val df = sc.parallelize(localData).toDF("id","data")

|-- id: integer (nullable = false)
|-- data: array (nullable = true)
|    |-- element: double (containsNull = false)


df.withColumn("data_size",size($"data")).show

+---+--------------------+---------+
| id|                data|data_size|
+---+--------------------+---------+
|  1|[0.77845301260182...|      217|
|  2|[0.28806915178410...|      202|
|  3|[0.76304121847720...|      165|
|  4|[0.57955190088558...|        9|
|  5|[0.82134215959459...|       11|
|  6|[0.42193739241567...|       57|
|  7|[0.76381645621403...|        4|
|  8|[0.56507523859466...|       93|
|  9|[0.83541853717244...|      107|
| 10|[0.77955626749231...|      111|
| 11|[0.83721643562080...|      223|
| 12|[0.30546029947285...|      116|
| 13|[0.02705462199952...|       46|
| 14|[0.46646815407673...|       41|
| 15|[0.66312488908446...|       16|
| 16|[0.72644646115640...|      166|
| 17|[0.32210572380128...|      197|
| 18|[0.66680355567329...|       61|
| 19|[0.87055594653295...|       55|
| 20|[0.96600507545438...|       89|
+---+--------------------+---------+

Now I want to apply an expensive UDF, the time for the computation is ~ proportional to the size of the data array. I wonder how I can repartition my data such that each partition has approximatively the same number of "records*data_size" (i.e., data points NOT just records).

If just do df.repartition(100), I may get some partitions containing some very large arrays which are then the bottleneck of the entire spark stage (all other taks being already finished). If course I could just chose an insane amount of partitions which will (almost) ensure that each record is in a separate partition. But is there another way?

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145

1 Answers1

5

As you said, you can increase the amount of partitions. I usually use a multiple of the number of cores: spark context default parallelism * 2-3..
In your case, you could use a bigger multiplier.

Another solution would be to filter split your df in this way:

  • df with only bigger arrays
  • df with the rest

You could then repartition each of them, perform computation and union them back.

Beware that repartitionning may be expensive since you have large rows to shuffle around.

You could have a look at theses slides (27+): https://www.slideshare.net/SparkSummit/custom-applications-with-sparks-rdd-spark-summit-east-talk-by-tejas-patil

They were experiencing very bad data skew and had to handle it in an interesting way.

Michel Lemay
  • 2,054
  • 2
  • 17
  • 34