There are a number of questions about how to obtain the number of partitions of a n RDD
and or a DataFrame
: the answers invariably are:
rdd.getNumPartitions
or
df.rdd.getNumPartitions
Unfortunately that is an expensive operation on a DataFrame
because the
df.rdd
requires conversion from the DataFrame
to an rdd
. This is on the order of the time it takes to run
df.count
I am writing logic that optionally repartition
's or coalesce
's a DataFrame
- based on whether the current number of partitions were within a range of acceptable values or instead below or above them.
def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame = {
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap{ minp =>
if (inputPartitions < minp) {
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
} else {
None
}
}.getOrElse( maxPartitions.map{ maxp =>
if (inputPartitions > maxp) {
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
} else inDf
}.getOrElse(inDf))
outDf
}
But we can not afford to incur the cost of the rdd.getNumPartitions
for every DataFrame
in this manner.
Is there not any way to obtain this information - e.g. from querying the online/temporary catalog
for the registered
table maybe?
Update The Spark GUI showed the DataFrame.rdd operation as taking as long as the longest sql in the job. I will re-run the job and attach the screenshot in a bit here.
The following is just a testcase : it is using a small fraction of the data size of that in production. The longest sql
is only five minutes - and this one is on its way to spending that amount of time as well (note that the sql
is not helped out here: it also has to execute subsequently thus effectively doubling the cumulative execution time).
We can see that the .rdd
operation at DataFrameUtils
line 30 (shown in the snippet above) takes 5.1mins - and yet the save
operation still took 5.2 mins later -i.e. we did not save any time by doing the .rdd
in terms of the execution time of the subsequent save
.