3

I'm developing an Apache Spark job to run and I plan to deploy it as one stage in an AWS Step Function. Unfortunately the particular way that I wish to deploy it isn't directly supported by Step Functions at this time; however, Step Functions has an API for a generic Task that I can make use of. Essentially, once the task is started, it needs to periodically make a call to sendTaskHeartbeat and then on completion it needs to call sendTaskSuccess.

My Spark job is written in Scala, and I'm wondering what the best approach for running something on a timer is within the context of an Apache Spark job. I see from other answers that I could make use of java.util.concurrent or perhaps java.util.Timer, but I'm not sure how the threading would work specifically in a Spark context. Since Spark is already doing a lot to distribute my code across each node I'm not sure if there are some hidden considerations I need to be weary of (i.e. I don't really want more than one instance of my timer, I want to make sure it stops when the sparky parts of my code complete, etc.

Is it safe to use a regular Timer in a Spark job? If I did something like this:

val timer = new Timer()
val task = new TimerTask {
    /* sendTaskHeartbeat */
}
timer.schedule(task, 1000L, 1000L)

val myRDD = spark.read.parquet(pathToParquetFiles)
val transformedRDD = myRDD.map( /* business logic */ )
transformedRDD.saveAsHadoopDataset(config) andThen task.cancel

Would that be sufficient? Or is there a risk that this code would lose track of the task and timer objects by the time it reaches the andThen, due to the distribution across nodes?

soapergem
  • 9,263
  • 18
  • 96
  • 152

2 Answers2

2

I believe your implement is sufficient. The timer task will only run in the driver node. (as long as you do not include them in the RDD transformation) Only thing need to be careful is error handling. Make sure the timer task getting terminated when the transformation throws an error. otherwise your job could stuck because of timer thread is still alive.

ouchxp
  • 46
  • 5
0

I ended up making use of a combination of a java.util.Timer and a SparkListener. I instantiate the Timer on the onJobStart event (and only once, so if (TIMER == null) { /* instantiate */ }, because the onJobStart event seemingly can fire multiple times). And then I handle the completion activity on the onApplicationEnd event (which does only fire once). The reason I didn't use onApplicationStart was because it seemed like by the time I hooked in my listener to the Spark context, this event had already fired.

soapergem
  • 9,263
  • 18
  • 96
  • 152