7

I'm trying to submit a spark 2.3 job on kubernetes cluster in scala using the play framework.

I have also tried as a simple scala program without using play framework.

The job is getting submitted to k8 cluster but stateChanged & infoChanged are not getting invoked. I also want to be able to get the handle.getAppId.

I'm using spark submit to submit the job, as described here

$ bin/spark-submit \
    --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi \
    --conf spark.executor.instances=5 \
    --conf spark.kubernetes.container.image=<spark-image> \
    local:///path/to/examples.jar

Here is the code for the job:

def index = Action {
    try {
       val spark = new SparkLauncher()
        .setMaster("my k8 apiserver host")
        .setVerbose(true)
        .addSparkArg("--verbose")
        .setMainClass("myClass")
        .setAppResource("hdfs://server/inputs/my.jar")
        .setConf("spark.app.name","myapp")
        .setConf("spark.executor.instances","5")
        .setConf("spark.kubernetes.container.image","mydockerimage")
        .setDeployMode("cluster")
        .startApplication(new SparkAppHandle.Listener(){

          def infoChanged(handle: SparkAppHandle): Unit = {
            System.out.println("Spark App Id [" 
              + handle.getAppId 
              + "] Info Changed.  State [" 
              + handle.getState + "]")
          }

          def stateChanged(handle: SparkAppHandle): Unit = {
            System.out.println("Spark App Id [" 
              + handle.getAppId 
              + "] State Changed. State [" 
              + handle.getState + "]")
            if (handle.getState.toString == "FINISHED") System.exit(0)
          }    
      } )

    Ok(spark.getState().toString())

    } catch {
      case NonFatal(e)=>{
        println("failed with exception: " + e)
      }
    }    
  Ok
}
Wytrzymały Wiktor
  • 11,492
  • 5
  • 29
  • 37
shiv455
  • 7,384
  • 19
  • 54
  • 93

1 Answers1

1

Spark Launcher Architecture Overview

SparkLauncher allows to programmatically run spark-submit command. It runs as a separate child thread in the JVM. You need to wait in your client main function until driver get launched in K8s and you get listener callbacks. Otherwise, JVM main threads exist killing the client and not reporting anything.

-----------------------                       -----------------------
|      User App       |     spark-submit      |      Spark App      |
|                     |  -------------------> |                     |
|         ------------|                       |-------------        |
|         |           |        hello          |            |        |
|         | L. Server |<----------------------| L. Backend |        |
|         |           |                       |            |        |
|         -------------                       -----------------------
|               |     |                              ^
|               v     |                              |
|        -------------|                              |
|        |            |      <per-app channel>       |
|        | App Handle |<------------------------------
|        |            |
-----------------------

Solution

I have added a j.u.c.CountDownLatch implementation that prevents main thread exiting until appState.isFinal is reached.

object SparkLauncher {
  def main(args: Array[String]) {

    import java.util.concurrent.CountDownLatch
    val countDownLatch = new CountDownLatch(1)

    val launcher = new SparkLauncher()
      .setMaster("k8s://http://127.0.0.1:8001")
      .setAppResource("local:/{PATH}/spark-examples_2.11-2.3.0.jar")
      .setConf("spark.app.name","spark-pi")
      .setMainClass("org.apache.spark.examples.SparkPi")
      .setConf("spark.executor.instances","5")
      .setConf("spark.kubernetes.container.image","spark:spark-docker")
      .setConf("spark.kubernetes.driver.pod.name","spark-pi-driver")
      .setDeployMode("cluster")
      .startApplication(new SparkAppHandle.Listener() {
        def infoChanged(handle: SparkAppHandle): Unit = {
        }

        def stateChanged(handle: SparkAppHandle): Unit = {
          val appState = handle.getState()
          println(s"Spark App Id [${handle.getAppId}] State Changed. State [${handle.getState}]")

          if (appState != null && appState.isFinal) {
            countDownLatch.countDown //waiting until spark driver exits
          }
        }
      })

    countDownLatch.await()
  }
}
Sayat Satybald
  • 6,300
  • 5
  • 35
  • 52
  • Were you able to get SUBMITTED state from sparklauncher handle in listener method if the job is successfully submitted on k8s cluster? – shiv455 May 06 '18 at 17:51
  • @shiv455 yes, I did. I also forgot to mention in your original code you forget to add `setMainClass` method. – Sayat Satybald May 06 '18 at 17:54
  • actually i have added in my code but i forgot to add here !! – shiv455 May 06 '18 at 17:55
  • Lemme try and see! – shiv455 May 06 '18 at 18:08
  • I didn't run on real k8s but on minikube. it took couple sec to launch a job and finish it for simple spark pi example. – Sayat Satybald May 06 '18 at 18:13
  • Nope it got timedout for me from client, job got successfully submitted on k8s cluster though (from k8s dashboard) which was happening even before this change – shiv455 May 06 '18 at 18:41
  • hmm, strange. can you share where do you run k8s? if you do `println(launcher.getState)` what state does it print for you? – Sayat Satybald May 06 '18 at 19:42
  • Im running on AWS – shiv455 May 07 '18 at 13:56
  • And i dont think we should use finalstate as i dont want to wait until job is FINISHED ,i just want to know if JOb is submitted successfully – shiv455 May 07 '18 at 13:57
  • I'm running k8s on Azure and even using `kubectl proxy` or the exposed k8s API, the stateChange method is never called. Only when the job finishes, but the state is LOST without any other information if was successful or not. – Sigrist Nov 07 '18 at 18:56