3

I have a long running Python 3.6 application that is hosting a PySpark 2.4.6 Session on YARN. I would like to be able to get notified/have a method called if the Spark Session crashes/dies, so that I can automatically restart it.

I'd like to do this proactively, rather than wrapping every call to the session and catching errors that the session is closed, and then making users wait while the session restarts.

Peter
  • 9,643
  • 6
  • 61
  • 108

2 Answers2

2

There is a REST API available for Yarn. In intervals you could query all currently running applications

http://rm-http-address:port/ws/v1/cluster/apps?states=RUNNING

and then check if your PySpark session is part of the result.

werner
  • 13,518
  • 6
  • 30
  • 45
  • How do I get the application id from the Spark Session/Context so that I know which application to check for? (The same account will probably have multiple open Spark sessions in the same Yarn queue) – Peter Jun 16 '20 at 15:47
  • You could either look for the name of the application in the result of the REST query (if its unique), or you could get the ID from the output of the submit command and store it (that's how I do it). – werner Jun 16 '20 at 15:49
  • I don't have a submit command. I'm launching Spark from inside of a stand-alone Python application and that app handles starting the Spark Session from code. – Peter Jun 16 '20 at 15:54
  • I found it, it's under `sparkSession._sc.applicationId`. I don't particularly like this approach, so I'll keep digging. When I manually killed my own Yarn job the application threw a nice exception instantly, so maybe there is a hook buried somewhere I can latch onto. – Peter Jun 16 '20 at 16:04
  • There is a `--name` parameter for the spark-submit command. It sets the Spark property `spark.app.name`. Maybe your Python code can set this property to a unique value. Otherwise the REST approach maybe does not work for you. Good luck with digging :-) – werner Jun 16 '20 at 16:09
2

Using https://stackoverflow.com/a/44084038/328968 as a starting point, you can create a listener and add it to the session. When the application ends you can perform a callback to restart the application.

SparkListener is defined in the above ref'd Answer.

class SparkApplicationEndListener(SparkListener):
    def __init__(self, applicationEndCallback):
        self.applicationEndCallback = applicationEndCallback

    def onApplicationEnd(self, applicationEnd):
        if self.applicationEndCallback != None:
            self.applicationEndCallback(applicationEnd)

########

def handleSparkApplicationEnd(app_end):
    print(app_end.toString())
    sparkSession = buildSparkSession()

def buildSparkSession():
    #......
    sparkSession.sparkContext._gateway.start_callback_server()
    listener = SparkApplicationEndListener(handleSparkApplicationEnd)
    sparkSession.sparkContext._jsc.sc().addSparkListener(listener)
Peter
  • 9,643
  • 6
  • 61
  • 108