4

I found out that when using .map( identity ).cache on a rdd, it become very slow if the items are big. While it is pretty much instantaneous otherwise.

Note: this is probably related to this question, but here I provide a very precise example (that can be executed directly in spark-shell):

// simple function to profile execution time (in ms)
def profile[R](code: => R): R = {
  val t = System.nanoTime
  val out = code
  println(s"time = ${(System.nanoTime - t)/1000000}ms")
  out
}

// create some big size item
def bigContent() = (1 to 1000).map( i => (1 to 1000).map( j => (i,j) ).toMap )

// create rdd
val n = 1000 // size of the rdd

val rdd = sc.parallelize(1 to n).map( k => bigContent() ).cache
rdd.count // to trigger caching

// profiling
profile( rdd.count )                 // around 12 ms
profile( rdd.map(identity).count )   // same
profile( rdd.cache.count )           // same
profile( rdd.map(identity).cache.count ) // 5700 ms !!!

I first expected that it was the time to create a new rdd (container). But if I use a rdd with same size but little content, there is only a tiny difference in execution time:

val rdd = parallelize(1 to n).cache
rdd.count

profile( rdd.count )                 // around 9 ms
profile( rdd.map(identity).count )   // same
profile( rdd.cache.count )           // same
profile( rdd.map(identity).cache.count ) // 15 ms

So, it looks like caching is actually copying the data. I thought it might also lose time serializing it, but I checked that cache is used with default MEMORY_ONLY persistence:

rdd.getStorageLevel == StorageLevel.MEMORY_ONLY // true

=> So, is caching copying data, or is it something else?

This is really a major limitation for my application because I started with a design that use something similar to rdd = rdd.map(f: Item => Item).cache that can be used with many such functions f applied in arbitrary order (order that I cannot determine before hand).

I am using Spark 1.6.0

Edit

When I look at the spark ui -> stage tab -> the last stage (i.e. 4), all tasks have pretty much the same data with:

  • duration = 3s (it went down to 3s, but that's still 2.9 too much :-\ )
  • scheduler 10ms
  • task deserialization 20ms
  • gc 0.1s (all tasks have that, but why would gc be triggered???)
  • result serialization 0ms
  • getting result 0ms
  • peak exec mem 0.0B
  • input size 7.0MB/125
  • no errors
Community
  • 1
  • 1
Juh_
  • 14,628
  • 8
  • 59
  • 92
  • Cache means spark will copy your data into spark cache in columnar data. And your action is map before cache may cause the shuffle action. Your can check by Spark UI to know what going on when your map and cache. – giaosudau Jun 16 '16 at 14:00
  • What do you mean by columnar data ? Here it is just `RDD[A]` for some `A`. Also, there is no shuffling. This is the most simple map operation. If the map was complex, it would take time in the 2nd profile: `rdd.map(identity).count` – Juh_ Jun 16 '16 at 14:17
  • I'll look at the UI, but I'm not sure what to look for – Juh_ Jun 16 '16 at 14:17
  • It means Spark will store data in columnar format like parquet file (columnar format). In Spark UI go to Stages tab (domain/stages) -> see table Summary Metrics for 35 Completed Tasks – giaosudau Jun 16 '16 at 14:22
  • I added info from spark ui. Note that there is no reason why it would serialize, in any format. Or else I am missing something? – Juh_ Jun 16 '16 at 14:42
  • You should be aware that the first time you call `cache` on an RDD, this will take time as it needs to store it into memory (+ it is lazy evaluated, so the cache operation only happens once you call your `count` on it). You don't seem to store your cached rdd in a variable, are you sure you are not just re-caching it twice ? I am going to test this on my cluster later on. – Jonathan Taws Jun 20 '16 at 15:13
  • There is probably something I don't understand. What I tried to do is to create an rdd that I cache, and call count on that to assert it has been cached. This initial step is just to start on a cached rdd. Then I try different calls that indicates that `.map(identity).cache` is very slow. From my understanding, it should just create a new rdd with same content. The test with a little rdd of same size shows that the rdd creation does not take much time. Thus I can only expect a data copy or serialization to explain this slowness. – Juh_ Jun 20 '16 at 15:20
  • 1
    @Juh_ This is normal as you are asking to cache a new rdd (which has been transformed by the `.map(identity)`. Therefore, if you append cache and then count to it, the first count will trigger the cache mechanism, but you won't benefit from it on this particular count action. However, following actions on your cached rdd will take advantage of it being cache - provided you stored a reference to it in a variable. – Jonathan Taws Jun 21 '16 at 06:31
  • Well, my point is to check how long it takes to cache an array. This test shows take it takes time proportional to the size of the rdd content, keeping same rdd.size. And I don't understand why :-( – Juh_ Jun 21 '16 at 07:19
  • @Juh_ Based on [this documentation](http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) , `MEMORY_ONLY` stores RDDs as deserialized Java objects in the JVM. Are you sure your data fits into memory ? Otherwise it will be recomputed on the fly. You can see that on the Storage tab in the application detail UI. Internally, `persist` uses a `BlockManager` with a partition iterator as explained [here](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-blockmanager.html). – Jonathan Taws Jun 21 '16 at 14:37
  • @giaosudau RDD.cache doesn't use columnar storage! – zero323 Jun 23 '16 at 13:59

1 Answers1

12

A jstack of the process running the org.apache.spark.executor.CoarseGrainedExecutorBackend during the slow caching reveals the following:

"Executor task launch worker-4" #76 daemon prio=5 os_prio=0 tid=0x00000000030a4800 nid=0xdfb runnable [0x00007fa5f28dd000]
   java.lang.Thread.State: RUNNABLE
  at java.util.IdentityHashMap.resize(IdentityHashMap.java:481)
  at java.util.IdentityHashMap.put(IdentityHashMap.java:440)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:251)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp(SizeEstimator.scala:284)
  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
  at org.apache.spark.util.SizeEstimator$.sampleArray(SizeEstimator.scala:276)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:260)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)


"Executor task launch worker-5" #77 daemon prio=5 os_prio=0 tid=0x00007fa6218a9800 nid=0xdfc runnable [0x00007fa5f34e7000]
   java.lang.Thread.State: RUNNABLE
  at java.util.IdentityHashMap.put(IdentityHashMap.java:428)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
  at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:224)
  at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:223)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:223)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

The SizeEstimator makes sense as one of the main costs of caching something which is ostensibly already in memory, since proper size estimation for unknown objects can be fairly difficult; if you look in the visitSingleObject method, you can see it heavily relies on reflection, calling getClassInfo which accesses runtime type information; not only does the full object hierarchy get traversed, but each nested member gets checked against an IdentityHashMap to detect which references refer to the same concrete object instance, and thus the stack traces show lots of time in those IdentityHashMap operations.

In the case of your example objects, you basically have each item as a list of maps from wrapped integers to wrapped integers; presumably Scala's implementation of the inner map holds an array as well, which explains the visitSingleObject -> List.foreach -> visitSingleObject -> visitSingleObject call hierarchy. In any case, there are lots of inner objects to visit in this case, and the SizeEstimators set up a fresh IdentityHashMap for each object sampled.

In the case where you measure:

profile( rdd.cache.count )

this doesn't count as exercising the caching logic since the RDD has already been successfully cached, so Spark is smart enough not to re-run the caching logic. You can actually isolate out the exact cost of the caching logic independently of the extra "map(identity)" transformation by profiling your fresh RDD creation and caching directly; here's my Spark session continuing from your last few lines:

scala> profile( rdd.count )
time = 91ms
res1: Long = 1000

scala> profile( rdd.map(identity).count )
time = 112ms
res2: Long = 1000

scala> profile( rdd.cache.count )
time = 59ms
res3: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6564ms                                                                   
res4: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).count )
time = 14990ms                                                                  
res5: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).cache.count )
time = 22229ms                                                                  
res6: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).map(identity).cache.count )
time = 21922ms                                                                  
res7: Long = 1000

So you can see, the slowness didn't come from the fact that you ran through a map transformation, per se, but rather in this case the ~6s appears to be the fundamental cost of calculating caching logic for 1000 objects when each object has something like ~1,000,000 to ~10,000,000 inner objects (depending on how the Map implementation is layed out; ex extra visitArray nesting in the top stack trace hints that the HashMap impl has nested arrays, which makes sense for a typical dense linear-probing data structure inside each hashtable entry).

For your concrete use case, you should err on the side of lazy caching if possible, since there's overhead associated with caching intermediate results that's not a good tradeoff if you're not really going to reuse the intermediate results for lots of separate downstream transformations. But as you mention in your question, if you're indeed using one RDD to branch out into multiple different downstream transformations, you might indeed need the caching step if the original transformations are at all expensive.

The workaround is to try to have inner data structures which are more amenable to constant-time calculations (e.g. arrays of primitives), where you can save a lot of cost on avoiding iterating over huge numbers of wrapper objects and depending on reflection for them in the SizeEstimator.

I tried things like Array[Array[Int]] and even though there's still nonzero overhead, it's 10x better for a similar data size:

scala> def bigContent2() = (1 to 1000).map( i => (1 to 1000).toArray ).toArray
bigContent2: ()Array[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent2() ).cache
rdd: org.apache.spark.rdd.RDD[Array[Array[Int]]] = MapPartitionsRDD[23] at map at <console>:28

scala> rdd.count // to trigger caching
res16: Long = 1000                                                              

scala> 

scala> // profiling

scala> profile( rdd.count )
time = 29ms
res17: Long = 1000

scala> profile( rdd.map(identity).count )
time = 42ms
res18: Long = 1000

scala> profile( rdd.cache.count )
time = 34ms
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 763ms                                                                    
res20: Long = 1000

To illustrate just how bad the cost of reflection on any fancier objects is, if I remove the last toArray there and end up with each bigContent being a scala.collection.immutable.IndexedSeq[Array[Int]], the performance goes back to being within ~2x the slowness of the original IndexSeq[Map[Int,Int]] case:

scala> def bigContent3() = (1 to 1000).map( i => (1 to 1000).toArray )
bigContent3: ()scala.collection.immutable.IndexedSeq[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent3() ).cache
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.IndexedSeq[Array[Int]]] = MapPartitionsRDD[27] at map at <console>:28

scala> rdd.count // to trigger caching
res21: Long = 1000                                                              

scala> 

scala> // profiling

scala> profile( rdd.count )
time = 27ms
res22: Long = 1000

scala> profile( rdd.map(identity).count )
time = 39ms
res23: Long = 1000

scala> profile( rdd.cache.count )
time = 37ms
res24: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 2781ms                                                                   
res25: Long = 1000

As discussed in the comment section, you can also consider using the MEMORY_ONLY_SER StorageLevel, where as long as there's an efficient serializer, it can quite possibly be cheaper than the recursive reflection used in SizeEstimator; to do that you'd just replace cache() with persist(StorageLevel.MEMORY_ONLY_SER); as mentioned in this other question, cache() is conceptually the same thing as persist(StorageLevel.MEMORY_ONLY).

import org.apache.spark.storage.StorageLevel
profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )

I actually tried this on both Spark 1.6.1 and Spark 2.0.0-preview running with everything else about the cluster configuration exactly the same (using Google Cloud Dataproc's "1.0" and "preview" image-versions, respectively). Unfortunately the MEMORY_ONLY_SER trick didn't appear to help in Spark 1.6.1:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 6709ms                                                                   
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6126ms                                                                   
res20: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 6214ms                                                                   
res21: Long = 1000

But in Spark 2.0.0-preview it seemed to improve performance by 10x:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 500ms
res18: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 5353ms                                                                   
res19: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 5927ms                                                                   
res20: Long = 1000

This could vary depending on your objects though; speedup would only be expected if serialization itself isn't using tons of reflection anyway; if you're able to effectively use the Kryo serialization then it's likely you can see improvement using MEMORY_ONLY_SER for these large objects.

Community
  • 1
  • 1
Dennis Huo
  • 10,517
  • 27
  • 43
  • But the conclusion is deceiving :-\ The design I was aiming for is to run many `rdd = rdd.map(f).cache` followed by collecting some statistics (such as `rdd.map( _.map(_.sum) ).collect`). The following `f` are then arbitrarily selected, partly w.r.t these collected values. Some f are simple, some are long... Good thing you showed that using array improve efficiency, though. But I don't know if I can use it. My real structure is more complex. I'll need to do some tests. – Juh_ Jun 24 '16 at 07:45
  • I also need to look at the SizeEstimator system of spark. At first glance, it seems I could make a home-made class that implement it to bypass to full parsing. Any knowledge about that? – Juh_ Jun 24 '16 at 07:52
  • Not sure whether it's possible to do dynamically, though I've certainly built customizations of the Spark assembly before, the instructions are pretty good if you just want to `git clone` and build your own Spark: https://github.com/apache/spark. – Dennis Huo Jun 24 '16 at 14:23
  • 1
    But before diving into the Spark layer too much, you might also consider changing the storage level to MEMORY_ONLY_SER instead of MEMORY_ONLY; I haven't had time to test that yet but in theory storing the caching layer in serialized form would at least bypass the heavyweight size-estimation logic, effectively being the same as if you manually kept everything in a byte-array format to reconstitute inside task logic. And in my past experience, deserialization cost can often be cheaper than doing too much reflection. – Dennis Huo Jun 24 '16 at 14:23
  • 1
    Interesting, I tried MEMORY_ONLY_SER in both Spark 1.6.1 and Spark 2.0.0-preview using [Google Cloud Dataproc](https://cloud.google.com/dataproc/) and Spark 1.6.1 showed MEMORY_ONLY_SER being the same performance as cache or persist(MEMORY_ONLY), while in Spark 2.0.0-preview, MEMORY_ONLY_SER was 10x faster. – Dennis Huo Jun 24 '16 at 17:18
  • Very interesting indeed. Are you using a specific serializer (kryo) ? – Juh_ Jun 27 '16 at 12:28
  • In my case I was actually using default Java serializers, didn't have time to try Kryo. I'd expect Kryo to show better speedup even in Spark 1.6. – Dennis Huo Jun 27 '16 at 21:11