By using df.explain
you can get much information about these operations.
I'm using this DataFrame for the example :
df = spark.createDataFrame([(i, f"value {i}") for i in range(1, 22, 1)], ["id", "value"])
Repartition
Depending on whether a key expression (column) is specified or not, the partitioning method will be different. It is not always hash partitioning as you said.
df.repartition(3).explain(True)
== Parsed Logical Plan ==
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false
== Analyzed Logical Plan ==
id: bigint, value: string
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false
== Optimized Logical Plan ==
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false
== Physical Plan ==
Exchange RoundRobinPartitioning(3)
+- Scan ExistingRDD[id#0L,value#1]
We can see in the generated physical plan that RoundRobinPartitioning
is used:
Represents a partitioning where rows are distributed evenly across
output partitions by starting from a random target partition number
and distributing rows in a round-robin fashion. This partitioning is
used when implementing the DataFrame.repartition() operator.
When using repartition by column expression:
df.repartition(3, "id").explain(True)
== Parsed Logical Plan ==
'RepartitionByExpression ['id], 3
+- LogicalRDD [id#0L, value#1], false
== Analyzed Logical Plan ==
id: bigint, value: string
RepartitionByExpression [id#0L], 3
+- LogicalRDD [id#0L, value#1], false
== Optimized Logical Plan ==
RepartitionByExpression [id#0L], 3
+- LogicalRDD [id#0L, value#1], false
== Physical Plan ==
Exchange hashpartitioning(id#0L, 3)
+- Scan ExistingRDD[id#0L,value#1]
Now the picked partitioning method is hashpartitioning
.
In hash partitioning method, a Java Object.hashCode
is being calculated for every key expression to determine the destination partition_id
by calculating a modulo: key.hashCode % numPartitions
.
RepartitionByRange
This partitioning method creates numPartitions
consecutive and not overlapping ranges of values based on the partitioning key. Thus, at least one key expression is required and needs to be orderable.
df.repartitionByRange(3, "id").explain(True)
== Parsed Logical Plan ==
'RepartitionByExpression ['id ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false
== Analyzed Logical Plan ==
id: bigint, value: string
RepartitionByExpression [id#0L ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false
== Optimized Logical Plan ==
RepartitionByExpression [id#0L ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false
== Physical Plan ==
Exchange rangepartitioning(id#0L ASC NULLS FIRST, 3)
+- Scan ExistingRDD[id#0L,value#1]
Looking at the generated physical plan, we can see that rangepartitioning
differs from the two others described above by the presence of the ordering clause in the partitioning expression. When no explicit sort order is specified in the expression, it uses ascending order by default.
Some interesting links: