10

I try to repartition a DataFrame according to a column the the DataFrame has N (let say N=3) different values in the partition-column x, e.g:

val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data

What I like to achieve is to repartiton myDF by x without producing empty partitions. Is there a better way than doing this?

val numParts = myDF.select($"x").distinct().count.toInt
myDF.repartition(numParts,$"x")

(If I don't specify numParts in repartiton, most of my partitions are empty (as repartition creates 200 partitions) ...)

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • 1
    According to http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options, the 200 partitions will be created because of default value for the config option `spark.sql.shuffle.partitions` – UninformedUser Jan 25 '17 at 18:24
  • 1
    Answer could be found http://stackoverflow.com/questions/41854818/spark-dataframe-repartition-number-of-partition-not-preserved?noredirect=1#comment70893687_41854818 – FaigB Jan 26 '17 at 12:42

1 Answers1

9

I'd think of solution with iterating over df partition and fetching record count in it to find non-empty partitions.

val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart") 

df.foreachPartition(partition =>
  if (partition.length > 0) nonEmptyPart.add(1))

As we got non-empty partitions (nonEmptyPart), we can clean empty partitions by using coalesce() (check coalesce() vs repartition()).

val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type

It may or may not be the best, but this solution will avoid shuffling as we are not using repartition()


Example to address comment

val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x")
val nonEmptyPart = sc.longAccumulator("nonEmptyPart")

df1.foreachPartition(partition =>
  if (partition.length > 0) nonEmptyPart.add(1))

val finalDf = df1.coalesce(nonEmptyPart.value.toInt)

println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}")
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}")
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}")

Output

nonEmptyPart => 3
df.rdd.partitions.length => 200
finalDf.rdd.partitions.length => 3
Community
  • 1
  • 1
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
  • `val df = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x").repartition(10, $"x")‌​.coalesce(3)`. Now it narrow downs number of partitions from 10 to 3. – mrsrinivas Feb 13 '17 at 10:55
  • and now do `finalDf.foreachPartition(p => println(p.size))`. I get `0 0 6`, i.e. 2 partitions are empty, 1 contains all the rows. Thats not what I wanted (I'm uisng Spark 1.6.3) – Raphael Roth Feb 13 '17 at 11:16
  • It could be because of shuffling disabled with `coalesce`. Try using `repartition`, it will shuffle all the data according to the `HashPartitioner`. so there will be a chance of every partition will be filled up with some data. If you really strict about removing empty partitions you may need run it(**finding non empty partitions and applying coalesce/repartition**) iterativly. – mrsrinivas Feb 13 '17 at 14:46