14

We use a Spark cluster as yarn-client to calculate several business, but sometimes we have a task run too long time:

enter image description here

We don't set timeout but I think default timeout a spark task is not too long such here ( 1.7h ).

Anyone give me an ideal to work around this issue ???

zengr
  • 38,346
  • 37
  • 130
  • 192
tnk_peka
  • 1,525
  • 2
  • 15
  • 25
  • Maybe it's good to speed some time understanding why that is happening and see if you can avoid it. Most of the times, this happens because partitioning is not evenly balanced across the keys resulting in some keys with few values and some others with gigantic amount of values. – hveiga Jun 18 '16 at 00:28

2 Answers2

24

There is no way for spark to kill its tasks if its taking too long.

But I figured out a way to handle this using speculation,

This means if one or more tasks are running slowly in a stage, they will be re-launched.

spark.speculation                  true
spark.speculation.multiplier       2
spark.speculation.quantile         0

Note: spark.speculation.quantile means the "speculation" will kick in from your first task. So use it with caution. I am using it because some jobs get slowed down due to GC over time. So I think you should know when to use this - its not a silver bullet.

Some relevant links: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-always-wait-for-stragglers-to-finish-running-td14298.html and http://mail-archives.us.apache.org/mod_mbox/spark-user/201506.mbox/%3CCAPmMX=rOVQf7JtDu0uwnp1xNYNyz4xPgXYayKex42AZ_9Pvjug@mail.gmail.com%3E

Update

I found a fix for my issue (might not work for everyone). I had a bunch of simulations running per task, so I added timeout around the run. If a simulation is taking longer (due to a data skew for that specific run), it will timeout.

ExecutorService executor = Executors.newCachedThreadPool();
Callable<SimResult> task = () -> simulator.run();

Future<SimResult> future = executor.submit(task);
try {
    result = future.get(1, TimeUnit.MINUTES);
} catch (TimeoutException ex) {
    future.cancel(true);
    SPARKLOG.info("Task timed out");
}

Make sure you handle an interrupt inside the simulator's main loop like:

if(Thread.currentThread().isInterrupted()){
    throw new InterruptedException();
} 
zengr
  • 38,346
  • 37
  • 130
  • 192
  • setting the quantile to 0 might have negative implications on resource consumption and latency in the event that some data skew is expected. instead it is better to have it as some meaningful percentage of the completed tasks of the stage. For example the default 0.75 means that first 75% of the tasks need to be completed so that a meaningful median time is used to compute the speculation threshold. If your stage has too few tasks for that, you can also use `spark.speculation.task.duration.threshold` introduced in Spark 3.0.0 – Asaf Nov 24 '20 at 22:15
1

The trick here is to login directly to the worker node and kill the process. Usually you can find the offending process with a combination of top, ps, and grep. Then just do a kill pid.

rsmith54
  • 818
  • 9
  • 15
  • 1
    While this is a valid answer and works in small jobs, it doesn't scale well when you own dozens of Spark jobs running 24/7. – Rudolf Real Sep 15 '22 at 15:13