1

I've got a 4 node Spark Standalone cluster with a spark streaming job running on it.

When I submit the job with 7 cores per executor everything runs smoothly:

spark-submit --class com.test.StreamingJob --supervise --master spark://{SPARK_MASTER_IP}:7077 --executor-memory 30G --executor-cores 7 --total-executor-cores 28 /path/to/jar/spark-job.jar

When I increase to 24 cores per executor none of the batches get processed and I see java.lang.OutOfMemoryError: unable to create new native thread in the executor logs. The executors then keep failing:

spark-submit --class com.test.StreamingJob --supervise --master spark://{SPARK_MASTER_IP}:7077 --executor-memory 30G --executor-cores 24 --total-executor-cores 96 /path/to/jar/spark-job.jar

Error:

17/01/12 16:01:00 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Shutdown-checker,5,main]
java.lang.OutOfMemoryError: unable to create new native thread
        at java.lang.Thread.start0(Native Method)
        at java.lang.Thread.start(Thread.java:714)
        at io.netty.util.concurrent.SingleThreadEventExecutor.shutdownGracefully(SingleThreadEventExecutor.java:534)
        at io.netty.util.concurrent.MultithreadEventExecutorGroup.shutdownGracefully(MultithreadEventExecutorGroup.java:146)
        at io.netty.util.concurrent.AbstractEventExecutorGroup.shutdownGracefully(AbstractEventExecutorGroup.java:69)
        at com.datastax.driver.core.NettyOptions.onClusterClose(NettyOptions.java:190)
        at com.datastax.driver.core.Connection$Factory.shutdown(Connection.java:844)
        at com.datastax.driver.core.Cluster$Manager$ClusterCloseFuture$1.run(Cluster.java:2488)

I found this question and tried upping the ulimits substantially but it had no effect.

Each box has 32 cores and 61.8 GB memory. The streaming job is written in java and running on Spark 2.0.0 connecting to Cassandra 3.7.0 with the spark-cassandra-connector-java_2.10 1.5.0-M2.

The data is a very small trickle of less than 100 events per second each of which are less than 200B.

Community
  • 1
  • 1
Kevin
  • 81
  • 1
  • 2
  • 13
  • What is the size of data ? Could you be able to check total ram per node and number of cores per node in your cluster ? – Sandeep Singh Jan 13 '17 at 02:21
  • So problem is you are assigning 24 executor core/ machine and 30 GB memory per executor . so total memory you are assigning 720GB. Your cluster having only 247GB memory, therefore you are getting out of memory exception :) – Sandeep Singh Jan 13 '17 at 10:22
  • @sandeep-singh There is only one executor per worker. Those are per executor limits. – Kevin Jan 13 '17 at 10:27
  • Yeah.. your data is small so you shouldnot assign more executor but here your are running streaming job so you can increase but assigning more core would be more beneficial. – Sandeep Singh Jan 13 '17 at 10:30
  • You can also measure time by increasing decreasing executor and judge best performance. – Sandeep Singh Jan 13 '17 at 10:32
  • With that config everything is backed up and nothing is getting processed. Only 2 executors started so 2 nodes were doing nothing. – Kevin Jan 13 '17 at 10:38
  • Accidentally deleted command, writing again: try your job with `spark-submit --num-executors 1 --executor-memory 1G --total-executor-cores 2` and remove `--total-executor-cores 96` from your command. Let me know if it works for you. I will answer in detail accordingly. – Sandeep Singh Jan 13 '17 at 10:38
  • are you getting any error? Can you try with removing all config and just with spark-submit. – Sandeep Singh Jan 13 '17 at 10:43

1 Answers1

0

Sounds like you are running Out of Memory ;).

For a little more detail, the number of cores in use by Spark is directly tied to the amount of information being worked on in parallel. You can basically think about each Core as working on a full Spark Partition's data and can potentially require the full thing to reside in memory.

7 Cores per executor means 7 Spark Partitions are being worked on simultaneously. Bumping this number up to 24 means roughly 4 times as much ram will be in use. This could easily cause an OOM in various places.

There are a few ways to deal with this.

  1. Allocate more memory to the Executor JVMs
  2. Shrink the size of the Spark Partitions (Smaller partitions means less data in memory at any given time)
  3. Make sure you aren't caching any RDDs in Memory (and thus exhausting the system resources)
  4. Reduce the amount of data you are working with, take subsets or try to filter at the server before hitting spark.
RussS
  • 16,476
  • 1
  • 34
  • 62
  • Thanks Russ, will try shrinking the partitions. This is happening even when there is very little data to process so I don't think it can be that. – Kevin Jan 12 '17 at 21:06
  • hi Russ, In a Standalone Cluster mode, how can I allocate more memory to the Executes? – fattah.safa May 21 '17 at 18:07
  • The docs are always helpful https://spark.apache.org/docs/latest/configuration.html#application-properties spark.executor.memory 1g Amount of memory to use per executor process (e.g. 2g, 8g). – RussS May 22 '17 at 01:05