I am trying to find a reliable way to compute the size (in bytes) of a Spark dataframe programmatically.
The reason is that I would like to have a method to compute an "optimal" number of partitions ("optimal" could mean different things here: it could mean having an optimal partition size, or resulting in an optimal file size when writing to Parquet tables - but both can be assumed to be some linear function of the dataframe size). In other words, I would like to call coalesce(n)
or repartition(n)
on the dataframe, where n
is not a fixed number but rather a function of the dataframe size.
Other topics on SO suggest using SizeEstimator.estimate
from org.apache.spark.util
to get the size in bytes of the dataframe, but the results I'm getting are inconsistent.
First of all, I'm persisting my dataframe to memory:
df.cache().count
The Spark UI shows a size of 4.8GB in the Storage tab. Then, I run the following command to get the size from SizeEstimator
:
import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
This gives a result of 115'715'808 bytes =~ 116MB. However, applying SizeEstimator
to different objects leads to very different results. For instance, I try computing the size separately for each row in the dataframe and sum them:
df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
This results in a size of 12'084'698'256 bytes =~ 12GB. Or, I can try to apply SizeEstimator
to every partition:
df.mapPartitions(
iterator => Seq(SizeEstimator.estimate(
iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
which results again in a different size of 10'792'965'376 bytes =~ 10.8GB.
I understand there are memory optimizations / memory overhead involved, but after performing these tests I don't see how SizeEstimator
can be used to get a sufficiently good estimate of the dataframe size (and consequently of the partition size, or resulting Parquet file sizes).
What is the appropriate way (if any) to apply SizeEstimator
in order to get a good estimate of a dataframe size or of its partitions? If there isn't any, what is the suggested approach here?