5

I was testing a web crawling/scrapping program on Apache Spark locally on my computer.

the program use a few RDD transformations that takes a volatile function that sporadically fails. (The function's purpose is to transform URL links into web pages, sometimes the headless browser it invoked just blackout or got overloaded - I can't avoid that)

I heard that Apache Spark has powerful failover and retrying feature, any unsuccessful transformation or lost data can be recalculated from scratch from whatever resource it can find (sounds like magic right?) so I didn't put any failover or try-catch in my code.

This is my spark configuration:

val conf = new SparkConf().setAppName("MoreLinkedIn")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","40") //definitely enough

Unfortunately the job failed after the majority of stages and individual tasks succeeded. The latest log in console shows:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:7 failed 1 times, most recent failure: Exception failure in TID 23 on host localhost: org.openqa.selenium.TimeoutException: Timed out after 50 seconds waiting for...

Looks like Spark just give up cowardly after failed once. How do I configure it properly to make it more tenacious?

(my program can be downloaded from https://github.com/tribbloid/spookystuff, sorry for the scarce and disorganized code/documentation, I just start it for a few days)

ADD: if you want to try it yourself, The following code can demonstrate this problem:

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","400000")
val sc = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 8
val n = 100000 * slices
val count = sc.parallelize(1 to n, slices).map { i =>
  val x = java.lang.Math.random()
  if (x > 0.9) throw new IllegalStateException("the map has a chance of 10% to fail")
  x
}.reduce(_ + _)
sc.stop()
println("finished")
}

It should be noted that the same IllegalStateException got retried for 32 times in this post: Apache Spark Throws java.lang.IllegalStateException: unread block data

Community
  • 1
  • 1
tribbloid
  • 4,026
  • 14
  • 64
  • 103
  • Please give a few context lines around the Exception. I have a hunch that user thrown exceptions don't result in retries as 90% of the time such exceptions would be due to a bug in the user code - but I'm not totally sure. – samthebest Jun 09 '14 at 14:31
  • I hope you are right as sometimes the user indeed want a job to fail quickly without taking too much time. However I don't know to give the 'context' lines. Could you be more specific? – tribbloid Jun 09 '14 at 15:33
  • My recent edit has added a minimal spark application that shows the problem, if you can run it without seeing: Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:7 failed 1 times, most recent failure: Exception failure in TID 7 on host localhost: java.lang.IllegalStateException: the map has a chance of 10% to fail Then your spark is running properly – tribbloid Jun 09 '14 at 15:39

3 Answers3

9

I know it's a very old question, but I had exactly same problem and came across this question while looking for a solution.

There are 3 master URL formats to submit a spark application in a local mode:

  • local - one thread (no parallelism), no retries
  • local[K] (or local[*]) - uses K (or number of cores) worker threads and sets task.maxFailures to 1 (see here)

  • local[K, F] (or local[*, F]) - sets the task.maxFailures=F, and this is what we were after.

Consult Spark documentation for details.

botchniaque
  • 4,698
  • 3
  • 35
  • 63
7

Let me forward the most authoritative answer:

If this is a useful feature for local mode, we should open a JIRA to document the setting or improve it (I’d prefer to add a spark.local.retries property instead of a special URL format). We initially disabled it for everything except unit tests because 90% of the time an exception in local mode means a problem in the application, and we’d rather let the user debug that right away rather than retrying the task several times and having them worry about why they get so many errors.

Matei

tribbloid
  • 4,026
  • 14
  • 64
  • 103
  • 1
    Please could you add a link as to where you found that answer. – samthebest Jun 10 '14 at 10:23
  • Sorry I didn't see your comment until now, I'm attaching the link right here: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-fault-tolerance-td7250.html – tribbloid Jun 15 '14 at 17:28
1

This works for me -

sparkConfig
.set("spark.task.maxFailures", "2")
.set("spark.master", "local[2, 2]")

I had to set both in order to see my task failing (while throwing an exception) and then reattempting in local test environment.

Ravindra
  • 97
  • 1
  • 9