0

Im running Spark App on AWS EMR(Elastic Map Reduce) cluster My master node characteristics are: 8 vCore, 15 GiB memory, 80 SSD GB storage My executors node are: 8 vCore, 15 GiB memory, 80 SSD GB storage

I have csv inputFile file with size - 600MB.I am trying to read it to JavaRdd and then use collect() to convert it to List of objects.

Here is my code:

JavaRDD<WebLabPurchasesDataObject> allRecords = context.textFile(inputFile).map (
        data -> {
            String[] fields = data.split(",", -1);

            String hitDay = fields[0];
            String treatmentName = fields[1];
            String sessionId = fields[2];

            return new WebLabPurchasesDataObject(hitDay,treatmentName,sessionId);   
        });

allRecords.cache();

List<WebLabPurchasesDataObject> webLabRddAllRecordsList = allRecords.collect();

Everytime I try to run this code, I get java.lang.OutOfMemoryError: Java heap space. As far as I understand Spark is performing collect() operation on my master node. So is there any way to increase the memory, so it would be able to run the program?

18/03/15 16:35:48 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 5, ip-1443-405-18-1544.us-west-2.compute.internal): java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)    18/03/15 16:35:49 ERROR cluster.YarnScheduler: Lost executor 1 on ip-43ew55-154.us-west-2.compute.internal: remote Akka client disassociated
WomenWhoCode
  • 396
  • 1
  • 6
  • 18

1 Answers1

2

As you have identified, since the results are collected on the driver, you need to increase the driver memory. the default value is 1GB which is turning out to be insufficient for your case.

Add this config when you are creating SparkSession/SparkContext: spark.driver.memory with a greater value: 2g or 3g. If you are using spark-shell then pass this as additional option while starting spark-shell: --driver-memory 3g for 3GB memory.

I also suggest you to read more about configurations described here: https://spark.apache.org/docs/latest/configuration.html

Lokesh Yadav
  • 958
  • 2
  • 9
  • 20
  • Hi! Thanks for help! When I try to set "spark.driver.memory" ="5g", I'm getting different error: : Exception while getting task result io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct buffer memory – WomenWhoCode Mar 15 '18 at 17:34
  • 1
    Sorry, I missed to include another conf along with driver memory: `spark.driver.maxResultSize` You should set it to how much large objects you are going to collect. I suggest you set it `0` (unlimited) which implies you could use as much memory available with the driver to collect results. – Lokesh Yadav Mar 15 '18 at 19:09
  • Let me know if this works, I'll update the answer in that case. – Lokesh Yadav Mar 15 '18 at 19:10
  • Thanks! Will try with maxResultSize. Do I need to cache() my javaRdd before perform collect() ? If yes, waht StorageLevel I need to set to my persist() function? – WomenWhoCode Mar 15 '18 at 19:13
  • 1
    Cache/persist won't affect this OOM error as this is due to driver memory. But Caching would sure improve performance, given you have sufficient memory available on executors. – Lokesh Yadav Mar 15 '18 at 21:16