Let's try to understand this by looking at the source code.
When you call df.repartition(someInteger)
in pyspark, this line gets executed:
return DataFrame(self._jdf.repartition(numPartitions), self.sparkSession)
This brings us to the Java repartition
function, which we van find in Dataset.scala:
def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = true, logicalPlan)
}
So that adds a Repartition operation to our query plan:
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
extends RepartitionOperation {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
override def partitioning: Partitioning = {
require(shuffle, "Partitioning can only be used in shuffle.")
numPartitions match {
case 1 => SinglePartition
case _ => RoundRobinPartitioning(numPartitions)
}
}
override protected def withNewChildInternal(newChild: LogicalPlan): Repartition =
copy(child = newChild)
}
In there, we see that in case numPartitions > 1
the partitioning used is a RoundRobinPartitioning(numPartitions)
. Let's have a look at this RoundRobinPartitioning
in action in ShuffleExchangeExec's prepareShuffleDependency
method. There are 2 interesting val
s in there:
- The partitioner creates a HashPartitioner which uses Java's
.hashcode()
and the modulo operator to determine the partitioning:
val part: Partitioner = newPartitioning match {
case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
case HashPartitioning(_, n) =>
...
}
- The
rddWithPartitionIds
val adds the following documentation:
// [SPARK-23207] Have to make sure the generated RoundRobinPartitioning is deterministic,
// otherwise a retry task may output different rows and thus lead to data loss.
//
// Currently we following the most straight-forward way that perform a local sort before
// partitioning.
//
// Note that we don't perform local sort if the new partitioning has only 1 partition, under
// that case all output rows go to the same partition.
Conclusion
- When we do
df.repartition(someInteger)
, we're using Java's .hashCode
and a modulo operation to determine the partition in which a record will end up in. This applies the .hashCode
method on your Java object. This is apparently not necessarily deterministic from Java application to Java application. I did try to find out situations where I would not get always the same results (on a very small scale) and did not find any case where the partitioning was not identical. For this, I used this testing code:
import spark.implicits._
val df = Seq(
("Alex", 4.0, 3.2, 3.0),
("Cathy", 2.0, 4.2, 1.2),
("Alice", 1.0, 5.0, 3.5),
("Mark", 3.0, 3.5, 0.5),
("Jenny", 3.0, 3.5, 0.5),
("Bob", 3.0, 3.5, 0.5),
("John", 3.0, 3.5, 0.5),
("Chloe", 3.0, 3.5, 0.5)
).toDF("Name", "Test A", "Test B", "Test C")
.repartition(3)
val output = df
.rdd
.mapPartitionsWithIndex{
(index, itr) => itr.toList.map(x => x + "#" + index).iterator
}.collect()
- To make sure this repartitioning operation is deterministic during an operation (so robust against tasks failing), some sorting has been added (which makes your repartitioning a tiny bit slower).
Hope this helps!