5

The first stage of my spark job is quite simple.

  1. It reads from a big number of files (around 30,000 files and 100GB in total) -> RDD[String]
  2. does a map (to parse each line) -> RDD[Map[String,Any]]
  3. filters -> RDD[Map[String,Any]]
  4. coalesces (.coalesce(100, true))

When running it, I observe a quite peculiar behavior. The number of executors grows until the given limit I specified in spark.dynamicAllocation.maxExecutors (typically 100 or 200 in my application). Then it starts decreasing quickly (at approx. 14000/33428 tasks) and only a few executors remain. They are killed by the drive. When this task is done. The number of executors increases back to its maximum value.

Below is a screenshot of the number of executors at its lowest.

enter image description here

An here is a screenshot of the task summary.

enter image description here

I guess that these executors are killed because they are idle. But, in this case, I do not understand why would they become idle. There remains a lot of task to do in the stage...

Do you have any idea of why it happens?

EDIT

More details about the driver logs when an executor is killed:

16/09/30 12:23:33 INFO cluster.YarnClusterSchedulerBackend: Disabling executor 91.
16/09/30 12:23:33 INFO scheduler.DAGScheduler: Executor lost: 91 (epoch 0)
16/09/30 12:23:33 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 91 from BlockManagerMaster.
16/09/30 12:23:33 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(91, server.com, 40923)
16/09/30 12:23:33 INFO storage.BlockManagerMaster: Removed 91 successfully in removeExecutor
16/09/30 12:23:33 INFO cluster.YarnClusterScheduler: Executor 91 on server.com killed by driver.
16/09/30 12:23:33 INFO spark.ExecutorAllocationManager: Existing executor 91 has been removed (new total is 94)

Logs on the executor

16/09/30 12:26:28 INFO rdd.HadoopRDD: Input split: hdfs://...
16/09/30 12:26:32 INFO executor.Executor: Finished task 38219.0 in stage 0.0 (TID 26519). 2312 bytes result sent to driver
16/09/30 12:27:33 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
16/09/30 12:27:33 INFO storage.DiskBlockManager: Shutdown hook called
16/09/30 12:27:33 INFO util.ShutdownHookManager: Shutdown hook called
Pop
  • 12,135
  • 5
  • 55
  • 68
  • Do you have more logs? On the machines itself? Browse around in the web UI some. – Reactormonk Sep 30 '16 at 12:22
  • I have added drive and executor logs – Pop Sep 30 '16 at 12:32
  • "16/09/30 12:27:33 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM" - find out why. OOM killer? Something else? Check `dmesg` and similar. – Reactormonk Sep 30 '16 at 13:59
  • No message talking about memory... And in the end, the job finishes successfully, therefore I guess there is no OOM otherwise the job would fail, wouldn't it? The logs I have put above are really the only ones that are not as usual – Pop Sep 30 '16 at 14:06
  • Without reproduction, I can't tell you much more than that. :-( – Reactormonk Sep 30 '16 at 15:20
  • Hey @Pop were you able to resolve this? I'm getting the same thing using AWS EMR. Basically the job ends up running on just one executor despite have many allocated for the job. – brandomr Feb 09 '17 at 05:50
  • @brandomr, no I haven't... My guess is still that it is linked to the reading part of the job (but I do not know in what way). Because, once the RDD is persisted, the issue disaperas – Pop Feb 09 '17 at 08:07
  • wierd @Pop did you just call `.persist()`? If so, at what stage? – brandomr Feb 09 '17 at 21:54

1 Answers1

0

I'm seeing this problem on executors that are killed as a result of an idle timeout. I have an exceedingly demanding computational load, but it's mostly computed in a UDF, invisible to Spark. I believe that there's some spark parameter that can be adjusted.

Try looking through the spark.executor parameters in https://spark.apache.org/docs/latest/configuration.html#spark-properties and see if anything jumps out.

vy32
  • 28,461
  • 37
  • 122
  • 246