28

I have RDD[Row], which needs to be persisted to a third party repository. But this third party repository accepts of maximum of 5 MB in a single call.

So I want to create partition based on the size of the data present in RDD and not based on the number of rows present in RDD.

How can I find the size of a RDD and create partitions based on it?

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
sag
  • 5,333
  • 8
  • 54
  • 91

5 Answers5

14

As Justin and Wang mentioned it is not straight forward to get the size of RDD. We can just do a estimate.

We can sample a RDD and then use SizeEstimator to get the size of sample. As Wang and Justin mentioned, based on the size data sampled offline, say, X rows used Y GB offline, Z rows at runtime may take Z*Y/X GB

Here is the sample scala code to get the size/estimate of a RDD.

I am new to scala and spark. Below sample may be written in a better way

def getTotalSize(rdd: RDD[Row]): Long = {
  // This can be a parameter
  val NO_OF_SAMPLE_ROWS = 10l;
  val totalRows = rdd.count();
  var totalSize = 0l
  if (totalRows > NO_OF_SAMPLE_ROWS) {
    val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
    val sampleRDDSize = getRDDSize(sampleRDD)
    totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
  } else {
    // As the RDD is smaller than sample rows count, we can just calculate the total RDD size
    totalSize = getRDDSize(rdd)
  }

  totalSize
}

def getRDDSize(rdd: RDD[Row]) : Long = {
    var rddSize = 0l
    val rows = rdd.collect()
    for (i <- 0 until rows.length) {
       rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
    }

    rddSize
}
sag
  • 5,333
  • 8
  • 54
  • 91
  • How is YARN getting the RDD size? I am running jobs and have estimates of my RDD sizes in GB, but I am unable to access this info inside of my Spark code. – Glenn Strycker Aug 28 '15 at 20:40
  • Sorry I am not using Spark in YARN. I use databricks. I don't know much about yarn – sag Aug 29 '15 at 04:52
  • 1
    I found that value.asInstanceOf[AnyRef] works better then toString for estimating also value.toString can throw a null pointer if value is null and it seems casting will not have this problem, so it is also safer. – lockwobr Mar 29 '16 at 19:11
  • @lockwobr - Could you please update the answer accordingly? – sag Mar 30 '16 at 07:00
  • @lockwobr Unfortunately I don't have enough reputation to accept this edit. So let me edit it with your content – sag Mar 31 '16 at 04:44
  • I am not sure I am following, is there some you need me to do so you can accept the edit? – lockwobr Mar 31 '16 at 16:34
  • 2
    @SamuelAlexander `rdd.sample(true, NO_OF_SAMPLE_ROWS)` will return the full RDD, the second argument should be a number between 0 and 1 – Victor P. Aug 02 '16 at 13:11
  • Thanks for the link to `SizeEstimator` as well as your code - which seems useful. I am trying it out now. – WestCoastProjects Dec 07 '16 at 15:59
  • Thanks for the answer, I would love to see this in python instead of scala, for what it's worth. If I translate it from scala myself is there any way to include the pyspark code in this answer? – TheProletariat Feb 08 '18 at 15:27
  • 1
    @TheProletariat - Yes you can edit the answer with your code as well – sag Feb 09 '18 at 11:09
  • 1
    this example is wrong. As @VictorP. pointed out, NO_OF_SAMPLE_ROWS must be fractional – WamBamBoozle Feb 28 '18 at 00:48
7

One straight forward way is to call following, depending on whether you want to store your data in serialized form or not, then go to spark UI "Storage" page, you should be able to figure out the total size of the RDD (memory + disk):

rdd.persist(StorageLevel.MEMORY_AND_DISK)

or

rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

It is not easy to calculate accurate memory size at runtime. You may try do an estimation at runtime though: based on the size data sampled offline, say, X rows used Y GB offline, Z rows at runtime may take Z*Y/X GB; this is similar to Justin suggested earlier.

Hope this could help.

Haiying Wang
  • 652
  • 7
  • 10
  • 1
    Thanks for answer. Yes, this will help to find the size. But I want to check this during my pipeline/code execution itself. So manually checking in Spark UI is not a option for me. – sag Jul 18 '15 at 06:46
  • 1
    I think it is not easy to calculate accurate memory size at runtime. You may try do an estimation at runtime though: based on the size data sampled offline, say, X rows used Y GB offline, Z rows at runtime may take Z*Y/X GB; this is similar to Justin suggested earlier. – Haiying Wang Jul 20 '15 at 15:47
  • Random question, when I do rdd.cache() i don't see it in the UI. memory only storage is not displayed? – zengr Sep 16 '15 at 22:18
6

I think RDD.count() will give you the number of elements in the RDD

Yiying Wang
  • 171
  • 2
  • 5
  • 6
    Hello @Yiying, and welcome to StackOverflow. The poster is asking for the size of the RDD, not just the number of rows. Perhaps you could expand your answer so that the poster does not require any further clarification. Once you have enough reputation, you will be able to leave comments if you prefer. – buruzaemon Sep 08 '16 at 23:17
  • 2
    The question asks for the size in information units (bytes), supposedly. But `count` is also a measure of size -- this answer doesn't really answer the question, but does add information to what would be an ideal answer. – ribamar Jan 27 '17 at 15:51
3

This is going to depend on factors such as serialization, so it is not cut and dry. However, you could take a sample set and run some experimentation on that sample data, extrapolating from there.

Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
  • Consider I have a RDD with strings in it. Do it need to iterate through all the RDD and use String.size() to get the size? – sag Jul 15 '15 at 06:56
  • @sag thats one way of doing it, but it would add to execution time.you can go this way if your rdd is not extremely big. – BJC Mar 16 '18 at 02:41
0

This is the version to use if you are actually working with big data on a cluster -- i.e. it eliminates the collect.

def calcRDDSize(rdd: RDD[Row]): Long = {
  rdd.map(_.mkString(",").getBytes("UTF-8").length.toLong)
     .reduce(_+_) //add the sizes together
}

def estimateRDDSize( rdd: RDD[Row], fraction: Double ) : Long = {
  val sampleRDD = rdd.sample(true,fraction)
  val sampleRDDsize = calcRDDSize(sampleRDD)
  println(s"sampleRDDsize is ${sampleRDDsize/(1024*1024)} MB")

  val sampleAvgRowSize = sampleRDDsize / sampleRDD.count()
  println(s"sampleAvgRowSize is $sampleAvgRowSize")

  val totalRows = rdd.count()
  println(s"totalRows is $totalRows")

  val estimatedTotalSize = totalRows * sampleAvgRowSize
  val formatter = java.text.NumberFormat.getIntegerInstance
  val estimateInMB = formatter.format(estimatedTotalSize/(1024*1024))
  println(s"estimatedTotalSize is ${estimateInMB} MB")

  return estimatedTotalSize
}

// estimate using 15% of data
val size = estimateRDDSize(df.rdd,0.15)
warrens
  • 1,661
  • 18
  • 16
  • I think there's probably a solution that avoids the use of `collect` for the answer above by sag (which you do in this solution), but still uses spark's `SizeEstimator.estimate` which is presumably more accurate than running a `mkString` on the row and looking at the string length. Presumably this answer as-is would only work if they are stored as strings, and would depend on how the RDD is persisted (serialized as String, serialized as a Java object, etc.) – Marcus May 30 '19 at 16:35