2

We all know spark does not support partitions bigger than 2GB. (we get java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE when this happens)

So, how do we increase the number of partitions when partitionning by an expression using df.repartition(num, $"col") ? Here, any value of num will not prevent busting the 2GB limit if the col hashes to the same partition.

Dataframes cannot have a custom hash partitioner as we do with rdds. Some effort is made to have Range partitionner but that does not help here.

What I've found so far is to use window operations in order to generate a custom made partitioning column:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val wSpec = Window.partitionBy("col").orderBy($"col")
val maxRecords = 100000
val tempDF = df.withColumn("hash_part", concat_ws("-", $"col", (row_number().over(wSpec) / lit(maxRecords)).cast("int")))

val out = tempDF.repartition($"hash_part").drop("hash_part")

Do you have a better solution? More elegant? More Performant?

Related (but do not answer) SO posts:

Michel Lemay
  • 2,054
  • 2
  • 17
  • 34

0 Answers0