48

I am trying to find a reliable way to compute the size (in bytes) of a Spark dataframe programmatically.

The reason is that I would like to have a method to compute an "optimal" number of partitions ("optimal" could mean different things here: it could mean having an optimal partition size, or resulting in an optimal file size when writing to Parquet tables - but both can be assumed to be some linear function of the dataframe size). In other words, I would like to call coalesce(n) or repartition(n) on the dataframe, where n is not a fixed number but rather a function of the dataframe size.

Other topics on SO suggest using SizeEstimator.estimate from org.apache.spark.util to get the size in bytes of the dataframe, but the results I'm getting are inconsistent.

First of all, I'm persisting my dataframe to memory:

df.cache().count 

The Spark UI shows a size of 4.8GB in the Storage tab. Then, I run the following command to get the size from SizeEstimator:

import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)

This gives a result of 115'715'808 bytes =~ 116MB. However, applying SizeEstimator to different objects leads to very different results. For instance, I try computing the size separately for each row in the dataframe and sum them:

df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)

This results in a size of 12'084'698'256 bytes =~ 12GB. Or, I can try to apply SizeEstimator to every partition:

df.mapPartitions(
    iterator => Seq(SizeEstimator.estimate(
        iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)

which results again in a different size of 10'792'965'376 bytes =~ 10.8GB.

I understand there are memory optimizations / memory overhead involved, but after performing these tests I don't see how SizeEstimator can be used to get a sufficiently good estimate of the dataframe size (and consequently of the partition size, or resulting Parquet file sizes).

What is the appropriate way (if any) to apply SizeEstimator in order to get a good estimate of a dataframe size or of its partitions? If there isn't any, what is the suggested approach here?

hiryu
  • 1,308
  • 1
  • 14
  • 21

4 Answers4

42

Unfortunately, I was not able to get reliable estimates from SizeEstimator, but I could find another strategy - if the dataframe is cached, we can extract its size from queryExecution as follows:

df.cache.foreach(_ => ())
val catalyst_plan = df.queryExecution.logical
val df_size_in_bytes = spark.sessionState.executePlan(
    catalyst_plan).optimizedPlan.stats.sizeInBytes

For the example dataframe, this gives exactly 4.8GB (which also corresponds to the file size when writing to an uncompressed Parquet table).

This has the disadvantage that the dataframe needs to be cached, but it is not a problem in my case.

EDIT: Replaced df.cache.foreach(_=>_) by df.cache.foreach(_ => ()), thanks to @DavidBenedeki for pointing it out in the comments.

hiryu
  • 1,308
  • 1
  • 14
  • 21
  • can you please help with corresponding method in java? I am having hard time figuring it out. I tried below code. Its gives incorrect values. `LogicalPlan logicalPlan = dataSet.queryExecution().optimizedPlan(); Statistics stats = sparkSession.sessionState().executePlan(logicalPlan).analyzed().stats(dataSet.sqlContext().conf()); ` – Saiteja Parsi Oct 22 '19 at 14:47
  • I'm not familiar with the Java API, but in your code, when you assign `logicalPlan`, it seems you are using `optimizedPlan()`, which is a different plan. In my code, I access the `logical` property and extract it from there. – hiryu Nov 12 '19 at 09:30
  • 3
    How would I do this in pyspark? – Bob Jun 03 '20 at 12:59
  • this is giving me a really big unrealistic figure. – ss301 Oct 10 '20 at 04:22
  • 17
    For pyspark, you have to access the hidden `_jdf` and `jSparkSession` variables, since the Python objects do not expose the needed attributes directly... `df.cache().foreach(lambda x: x)` `catalyst_plan = df._jdf.queryExecution().logical()` `spark._jsparkSession.sessionState().executePlan(catalyst_plan).optimizedPlan().stats().sizeInBytes()` – hiryu Nov 17 '20 at 16:47
  • can this be used in pyspark? – thentangler Jan 27 '21 at 18:13
  • see the comment just before yours, that code is in pyspark. – hiryu Jan 28 '21 at 14:51
  • Just wondering why caching is the 'must' here?.. Could you please elaborate more on this? Thank you in advance. – RafalK May 31 '21 at 19:54
  • As you see, we are extracting the byte size of the dataframe from the stats of the optimized plan (more on that: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Statistics.html). These stats are usually just an estimation. However, when we cache the dataset, the optimized plan realizes that it can simply access the dataframe in memory, hence the stats become exact. The `.foreach(_=>_)` bit is only an action that forces execution and hence the caching. – hiryu Jun 02 '21 at 16:45
  • @hiryu should it also work to do a `localCheckpoint()` before? – scravy Jun 27 '21 at 02:28
  • Little fix for the cache line, as `foreach` doesn't have a return type: `df.cache.foreach(_ => Unit)` – David Benedeki Oct 19 '21 at 15:01
  • df.count() is action, why need to use df.cache.foreach()? – Emma Y Nov 05 '21 at 22:02
  • 1
    @DavidBenedeki thanks, not sure why it worked previously? however, it should be `df.cache.foreach(_ => ())`. – hiryu Nov 08 '21 at 13:03
  • 1
    @EmmaY the reason I don't use `count` is that, because of Spark Catalyst optimizations, `count` is not guaranteed to materialize all rows: https://stackoverflow.com/questions/42714291/how-to-force-dataframe-evaluation-in-spark Now according to the same link, using it in conjunction with cache should actually materialize the data... but I didn't want to take any chance. `foreach` will always materialize the dataframe, full stop. – hiryu Nov 08 '21 at 13:06
  • @scravy I never used `localCheckpoint`, so you will need to test for yourself. – hiryu Nov 08 '21 at 13:07
  • @hiryu. Thanks for sharing, but from this link, it seems that df.rdd.count() or df.cache().count() shall play the trick? – Emma Y Nov 08 '21 at 18:54
  • 1
    As long as the action called after `cache` guarantees the materialization of all rows and columns, it will do the trick. However, I could ask you the reverse question: what's wrong with using `foreach`? It guarantees materialization of all rows, while there are legit concerns with `df.count` - the link tells us that adding `cache` is sufficient, but is it confirmed by official docs/code? Moreover, `foreach` does not return anything, so it won't pollute stdout (the call to `count` will) and it will be marginally be more efficient (no need to send local counts to the driver for aggregation). – hiryu Nov 09 '21 at 15:56
  • how to use this : df.cache.foreach(_ => ()) val catalyst_plan = df.queryExecution.logical val df_size_in_bytes = spark.sessionState.executePlan( catalyst_plan).optimizedPlan.stats.sizeInBytes In Pyspark? – Carlos Eduardo Bilar Rodrigues Jan 05 '22 at 15:10
  • @CarlosEduardoBilarRodrigues see comment from Nov 17 2020. – hiryu Jan 07 '22 at 15:30
  • @Hiryu, I tried it, but it crashes when I use df.cache.foreach (_ => ()), it only works with df.cache(), but the result is not satisfactory – Carlos Eduardo Bilar Rodrigues Jan 07 '22 at 15:46
  • if you look in the comment I pointed you to (Nov 17 2020), it uses `.foreach(lambda x: x)`, not `.foreach(_ => ())`. The former is Python code, the latter is Scala. If `foreach(lambda x: x)` does not work (but it should, just tested), try `.foreach(lambda x: None)`. – hiryu Jan 13 '22 at 09:14
  • @hiryu, this process returns a very large number for a small group of data, can you tell me why? example 9MB translates a number of 100 characters – Carlos Eduardo Bilar Rodrigues Jan 17 '22 at 13:13
  • you will need to post a separate question with a minimal reproducible example to get more help, you are not providing enough context. What is described in the answer is a workaround that served me well in the past, but it's by no means an official solution nor a widely tested one. – hiryu Jan 17 '22 at 15:14
  • I would like to ask how it would be in Java. Would it be correct to write df.cache(); System.out.println(df.logicalPlan().stats().sizeInBytes()) ?? – Des0lat0r Feb 17 '22 at 11:16
8

SizeEstimator returns the number of bytes an object takes up on the JVM heap. This includes objects referenced by the object, the actual object size will almost always be much smaller.

The discrepancies in sizes you've observed are because when you create new objects on the JVM the references take up memory too, and this is being counted.

Check out the docs here
https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.util.SizeEstimator$

Steven Black
  • 1,988
  • 1
  • 15
  • 25
  • 3
    Thanks, I have seen the docs but tbh they confuse me even more. See for instance the 1st example where I apply the `estimate` method directly to the dataframe - the resulting size (116MB) is only a small fraction of the size shown in the UI when caching the object (4.8GB), not more. Moreover, the doc for `estimate` states that `This is useful for determining (...) or the amount of space each object will take when caching objects in deserialized form.` However, I don't see that happening in my tests... – hiryu Mar 26 '18 at 15:34
7

Apart from Size estimator, which you have already tried(good insight)..

below is another option

RDDInfo[] getRDDStorageInfo()

Return information about what RDDs are cached, if they are in mem or on both, how much space they take, etc.

actually spark storage tab uses this.Spark docs

Below is the implementation from spark

 /**
   * :: DeveloperApi ::
   * Return information about what RDDs are cached, if they are in mem or on disk, how much space
   * they take, etc.
   */
  @DeveloperApi
  def getRDDStorageInfo: Array[RDDInfo] = {
    getRDDStorageInfo(_ => true)
  }

  private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = {
    assertNotStopped()
    val rddInfos = persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray
    rddInfos.foreach { rddInfo =>
      val rddId = rddInfo.id
      val rddStorageInfo = statusStore.asOption(statusStore.rdd(rddId))
      rddInfo.numCachedPartitions = rddStorageInfo.map(_.numCachedPartitions).getOrElse(0)
      rddInfo.memSize = rddStorageInfo.map(_.memoryUsed).getOrElse(0L)
      rddInfo.diskSize = rddStorageInfo.map(_.diskUsed).getOrElse(0L)
    }
    rddInfos.filter(_.isCached)
  }

yourRDD.toDebugString from RDD also uses this. code here


General Note :

In my opinion, to get optimal number of records in each partition and check your repartition is correct and they are uniformly distributed, I would suggest to try like below... and adjust your re-partition number. and then measure the size of partition... would be more sensible. to address this kind of problems

yourdf.rdd.mapPartitionsWithIndex{case (index,rows) => Iterator((index,rows.size))}
  .toDF("PartitionNumber","NumberOfRecordsPerPartition")
  .show

or existing spark functions (based on spark version)

import org.apache.spark.sql.functions._ 

df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
-3

My suggestion is

from sys import getsizeof

def compare_size_two_object(one, two):
    '''compare size of two files in bites'''
    print(getsizeof(one), 'versus', getsizeof(two))