3

Can anyone explain how storage level of rdd works.

I got heap memory error when I use persist method with storage level(StorageLevel.MEMORY_AND_DISK_2()) However my code works fine when I use cache method.

As per spark doc documentation cache Persist RDD with the default storage level (MEMORY_ONLY).

My code where I get heap error

JavaRDD<String> rawData = sparkContext
                    .textFile(inputFile.getAbsolutePath())
                    .setName("Input File").persist(SparkToolConstant.rdd_stroage_level);

//          cache()

            String[] headers = new String[0];
            String headerStr = null;
            if (headerPresent) {
                headerStr = rawData.first();
                headers = headerStr.split(delim);
                List<String> headersList = new ArrayList<String>();
                headersList.add(headerStr);
                JavaRDD<String> headerRDD = sparkContext
                        .parallelize(headersList);
                JavaRDD<String> filteredRDD = rawData.subtract(headerRDD)
                        .setName("Raw data without header").persist(StorageLevel.MEMORY_AND_DISK_2());;
                rawData = filteredRDD;
            }

Stack trace

 Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 10, localhost): java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
    at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1176)
    at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1185)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:846)
    at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:668)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:176)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:

Spark version : 1.3.0

hayat
  • 384
  • 2
  • 27
  • can you give your cluster detials? – Tinku May 06 '15 at 12:41
  • 1
    I m running on my local system with local mode , 4 core and 2gb driver memory using spark-submit cmd. – hayat May 06 '15 at 13:11
  • worker memory? because this exception come from worker. – Tinku May 07 '15 at 03:51
  • Where I can set worker memory if I run in local mode(standalone mode). – hayat May 07 '15 at 08:27
  • spark.executor.memory=2g – Tinku May 08 '15 at 05:58
  • Ok this for sharing your knowledge. When I set driver memory 2gb driver set executor memory automatic 1068 MB based on driver memory.I did not set executor memory bcz I'm running in local mode. – hayat May 08 '15 at 10:50
  • This is memory issue on workers. try to execute after increase worker memory. – Tinku May 08 '15 at 11:17
  • As mention in question when storage level is MEMORY_ONLY its working fine for 258 file but when I set storage level MEMORY_AND_DISK_2 its throwing error. This think looks strange bcz in storage level MEMORY_AND_DISK_2 need less memory I think as per name indicate.Can u suggest me any doc about RDD storage level . – hayat May 08 '15 at 12:20
  • Did you ever get this figured out? I would think this is a difficult question, without knowing what your application does it's hard to answer, I could see that once you persist an RDD to memory and try to bring it back into memory, there's not enough left. It would totally depend on the details, interested what you learned. Probably not helpful but for docs you can see [Spark Programming - Persist](http://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose) and [Tuning]() – JimLohse Jun 20 '16 at 01:28
  • Also I am not totally convinced that there is a separate executor and worker in local[] mode? But I have not run it in so long I would have to check. Doesn't local[] mode simply use multiple threads in the same JVM? "Note that we can have more than 1 thread in local mode, and in cases like Spark Streaming, we may actually require more than 1 thread to prevent any sort of starvation issues." http://spark.apache.org/docs/latest/configuration.html#spark-properties – JimLohse Jun 20 '16 at 01:33

1 Answers1

0

Seeing this go unanswered for so long, I post this for general info and for those like me whose searches lead here.

This type of question is hard to answer without more specifics about your application. In general, it does seem upside down that you'd get a memory error when serializing to disk. I suggest you try with Kryo serialization and if you have a lot of extra memory somewhere use Alluxio (the software formerly known as Tachyon :) for "disk serialization," this will speed things up.

More from Spark docs on Tuning Data Storage, Serialized RDD Storage and (maybe helpful) GC Tuning:

When your objects are still too large to efficiently store despite this tuning, a much simpler way to reduce memory usage is to store them in serialized form, using the serialized StorageLevels in the RDD persistence API, such as MEMORY_ONLY_SER. Spark will then store each RDD partition as one large byte array. The only downside of storing data in serialized form is slower access times, due to having to deserialize each object on the fly. We highly recommend using Kryo if you want to cache data in serialized form, as it leads to much smaller sizes than Java serialization (and certainly than raw Java objects).

JimLohse
  • 1,209
  • 4
  • 19
  • 44