4

I am trying to create an executable jar which is using a spark launcher to run another jar with data transformation task(this jar creates spark session).

I need to pass java parameters(some java arrays) to the jar which is executed by the launcher.

object launcher {
  @throws[Exception]
  // How do I pass parameters to spark_job_with_spark_session.jar
  def main(args: Array[String]): Unit = {
    val handle = new SparkLauncher()
      .setAppResource("spark_job_with_spark_session.jar")
      .setVerbose(true)
      .setMaster("local[*]")
      .setConf(SparkLauncher.DRIVER_MEMORY, "4g")
      .launch()
  }
}

How can I do that?

Oleg Yarin
  • 161
  • 2
  • 12

2 Answers2

4

need to pass java parameters(some java arrays)

It is equivalent to executing spark-submit so you cannot pass Java objects directly. Use app args

addAppArgs(String... args)

to pass application arguments, and parse them in your app.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • Thank you for your answer! Damn... I will need to pass big java arrays from the memory of the application. Therefore It should be fast, so I don't think cast the array to a string will work... Do you maybe have another suggestion on how to achieve that? – Oleg Yarin Feb 06 '18 at 11:57
  • Maybe serialize write to file and read it back and pass only a path as an argument? – Alper t. Turker Feb 06 '18 at 11:59
  • hmm, writing to a file would be also very expensive operation :( – Oleg Yarin Feb 06 '18 at 12:10
  • 1
    I doubt you have much choice here. This a separate JVM, there is no shared memory, so I think that only optimizations possible are a) fast serialization b) fast file system (in memory fs). Unless you want to do something like direct memory access by address ;) – Alper t. Turker Feb 06 '18 at 12:14
  • @hi-zir - thanks for the answer - this helped me in my own situation where I needed to pass arguments to the driver application (vote+1). In my case I used Apache Common CLI library to parse my command line arguments. Worked like a charm. Thanks! – ishmaelMakitla Jan 21 '19 at 11:35
2
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package com.meow.woof.meow_spark_launcher.app;

import com.meow.woof.meow_spark_launcher.common.TaskListener;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

/**
 *
 * @author hahattpro
 */
public class ExampleSparkLauncherApp {

    public static void main(String[] args) throws Exception {
        SparkAppHandle handle = new SparkLauncher()
                .setAppResource("/home/cpu11453/workplace/experiment/SparkPlayground/target/scala-2.11/SparkPlayground-assembly-0.1.jar")
                .setMainClass("me.thaithien.playground.ConvertToCsv")
                .setMaster("spark://cpu11453:7077")
                .setConf(SparkLauncher.DRIVER_MEMORY, "3G")
                .addAppArgs("--input" , "/data/download_hdfs/data1/2019_08_13/00/", "--output", "/data/download_hdfs/data1/2019_08_13/00_csv_output/")
                .startApplication(new TaskListener());

        handle.addListener(new SparkAppHandle.Listener() {
            @Override
            public void stateChanged(SparkAppHandle handle) {
                System.out.println(handle.getState() + " new  state");
            }

            @Override
            public void infoChanged(SparkAppHandle handle) {
                System.out.println(handle.getState() + " new  state");
            }
        });

        System.out.println(handle.getState().toString());

        while (!handle.getState().isFinal()) {
            //await until job finishes
            Thread.sleep(1000L);
        }
    }
}

Here is example code that work

Haha TTpro
  • 5,137
  • 6
  • 45
  • 71