17

I am trying to submit a JAR with Spark job into the YARN cluster from Java code. I am using SparkLauncher to submit SparkPi example:

Process spark = new SparkLauncher()
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar")
    .setMainClass("org.apache.spark.examples.SparkPi")
    .setMaster("yarn-cluster")
    .launch();
System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);

There are two problems:

  1. While submitting in "yarn-cluster" mode, the application is sucessfully submitted to YARN and executes successfully (it is visible in the YARN UI, reported as SUCCESS and pi is printed in the output). However, the submitting application is never notified that processing is finished - it hangs infinitely after printing "Waiting to finish..." The log of the container can be found here
  2. While submitting in "yarn-client" mode, the application does not appear in YARN UI and the submitting application hangs at "Waiting to finish..." When hanging code is killed, the application shows up in YARN UI and it is reported as SUCCESS, but the output is empty (pi is not printed out). The log of the container can be found here

I tried to execute the submitting application both with Oracle Java 7 and 8.

Reddy
  • 8,737
  • 11
  • 55
  • 73
TomaszGuzialek
  • 861
  • 1
  • 8
  • 15

3 Answers3

17

I got help in the Spark mailing list. The key is to read / clear getInputStream and getErrorStream() on the Process. The child process might fill up the buffer and cause a deadlock - see Oracle docs regarding Process. The streams should be read in separate threads:

Process spark = new SparkLauncher()
    .setSparkHome("C:\\spark-1.4.1-bin-hadoop2.6")
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar")
    .setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn-cluster").launch();

InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(spark.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();

InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(spark.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();

System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);

where InputStreamReaderRunnable class is:

public class InputStreamReaderRunnable implements Runnable {

    private BufferedReader reader;

    private String name;

    public InputStreamReaderRunnable(InputStream is, String name) {
        this.reader = new BufferedReader(new InputStreamReader(is));
        this.name = name;
    }

    public void run() {
        System.out.println("InputStream " + name + ":");
        try {
            String line = reader.readLine();
            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
TomaszGuzialek
  • 861
  • 1
  • 8
  • 15
  • In my case, I had a classpath problem, so the spark exited immediately. So, if it seems to someone else that it is simply not calling to your spark app, this answer works as well. – jmmut Feb 04 '16 at 17:53
9

Since this is an old post, i would like to add an update that might help whom ever read this post after. In spark 1.6.0 there are some added functions in SparkLauncher class. Which is:

def startApplication(listeners: <repeated...>[Listener]): SparkAppHandle

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.launcher.SparkLauncher

You can run the application with out the need for additional threads for the stdout and stderr handling plush there is a nice status reporting of the application running. Use this code:

  val env = Map(
      "HADOOP_CONF_DIR" -> hadoopConfDir,
      "YARN_CONF_DIR" -> yarnConfDir
    )
  val handler = new SparkLauncher(env.asJava)
      .setSparkHome(sparkHome)
      .setAppResource("Jar/location/.jar")
      .setMainClass("path.to.the.main.class")
      .setMaster("yarn-client")
      .setConf("spark.app.id", "AppID if you have one")
      .setConf("spark.driver.memory", "8g")
      .setConf("spark.akka.frameSize", "200")
      .setConf("spark.executor.memory", "2g")
      .setConf("spark.executor.instances", "32")
      .setConf("spark.executor.cores", "32")
      .setConf("spark.default.parallelism", "100")
      .setConf("spark.driver.allowMultipleContexts","true")
      .setVerbose(true)
      .startApplication()
println(handle.getAppId)
println(handle.getState)

You can keep enquering the state if the spark application until it give success. For information about how the Spark Launcher server works in 1.6.0. see this link: https://github.com/apache/spark/blob/v1.6.0/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java

Abdulrahman
  • 433
  • 4
  • 11
  • 3
    I would like to make emphasis this only works in client mode. – msemelman Sep 28 '16 at 19:15
  • @msemelman Thank you greatly for this clarification, was getting stuck on this. How did you learn this fact? – spicecat Nov 10 '16 at 00:11
  • Works in cluster mode as well. I'm using Spark-1.6.1 – Tariq Nov 10 '16 at 20:27
  • I was mistaken, But according to the hidden docs inside SparkLauncher: https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java (as answer shows) ***"The launcher server listens on the localhost only..."*** So you can only launch if the driver happens to be in the same host as the one you are. – msemelman Nov 25 '16 at 16:55
  • Sometime you may also need Thread.sleep(10000L) to see the status of your job. As this code doesn't wait for spark-submit to complete. I have below code to make it work : while(!spark.getState.isFinal){ println(spark.getAppId) Thread.sleep(10000L) } – Manish May 06 '19 at 09:30
4

I implemented using CountDownLatch, and it works as expected. This is for SparkLauncher version 2.0.1 and it works in Yarn-cluster mode too.

    ...
final CountDownLatch countDownLatch = new CountDownLatch(1);
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch);
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);
Thread sparkAppListenerThread = new Thread(sparkAppListener);
sparkAppListenerThread.start();
long timeout = 120;
countDownLatch.await(timeout, TimeUnit.SECONDS);    
    ...

private static class SparkAppListener implements SparkAppHandle.Listener, Runnable {
    private static final Log log = LogFactory.getLog(SparkAppListener.class);
    private final CountDownLatch countDownLatch;
    public SparkAppListener(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
    @Override
    public void stateChanged(SparkAppHandle handle) {
        String sparkAppId = handle.getAppId();
        State appState = handle.getState();
        if (sparkAppId != null) {
            log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - "
                    + SPARK_STATE_MSG.get(appState));
        } else {
            log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState));
        }
        if (appState != null && appState.isFinal()) {
            countDownLatch.countDown();
        }
    }
    @Override
    public void infoChanged(SparkAppHandle handle) {}
    @Override
    public void run() {}
}
Elkhan Dadashov
  • 2,417
  • 1
  • 13
  • 5
  • This is really a comment, not an answer. Once you reach 50 [reputation](//stackoverflow.com/help/whats-reputation), you will be able to [comment on all posts](//stackoverflow.com/privileges/comment). – The SE I loved is dead Oct 20 '16 at 20:57