0

I am trying to fix an outofmemory issue I am seeing in my spark setup and at this point, I am unable to conclude on a concrete analysis as to why I am seeing this. I am always seeing this issue when writing a dataframe to parquet or kafka. My dataframe has 5000 rows. It's schema is

root

     |-- A: string (nullable = true)
     |-- B: string (nullable = true)
     |-- C: string (nullable = true)
     |-- D: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- E: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- F: double (nullable = true)
     |-- G: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- H: integer (nullable = true)
     |-- I: double (nullable = true)
     |-- J: double (nullable = true)
     |-- K: array (nullable = true)
     |    |-- element: double (containsNull = false)

Of this the column G can have a cell size of upto 16MB. My dataframe total size is about 10GB partitioned into 12 partitions. Before writing, I am attempting to create 48 partitions out of this using repartition(), but the issue is seen even if I write without repartitioning. At the time of this exception, I have only one Dataframe cached with size of about 10GB. My driver has 19GB of free memory and the 2 executors have 8 GB of free memory each. The spark version is 2.1.0.cloudera1 and scala version is 2.11.8.

I have the below settings:

spark.driver.memory     35G
spark.executor.memory   25G
spark.executor.instances    2
spark.executor.cores    3
spark.driver.maxResultSize      30g
spark.serializer        org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 1g
spark.rdd.compress      true
spark.rpc.message.maxSize       2046
spark.yarn.executor.memoryOverhead      4096

The exception traceback is

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:991)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:765)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:764)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.scheduler.DAGScheduler.submitWaitingChildStages(DAGScheduler.scala:764)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1228)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Any insights?

John Subas
  • 81
  • 1
  • 11
  • Had a similar issue. Increasing Java heap size solved it. See https://stackoverflow.com/q/1565388/5039312 – Marco Sep 14 '17 at 11:06

1 Answers1

-1

We finally found the issue. We were running kfold logistic regression in scala on 5000 rows of dataframe with k size as 4. After the classification is done, we basically got 4 test output dataframes of size 1250, each of them partitioned by at least 200 partitions. So in all we had greater than 800 partitions on 5000 rows of data. The code would then proceed to repartitioning this data to 48 partitions. Our system couldn't handle this repartition probably due to shuffling. To fix this we repartitioned each fold output dataframe to a smaller number (instead of doing it on the combined dataframe) and this has fixed the issue.

John Subas
  • 81
  • 1
  • 11