15

I have a class like below, and when i run this through command line i want to see progress status. some thing like,

10% completed... 
30% completed... 
100% completed...Job done!

I am using spark 1.0 on yarn and using Java API.

public class MyJavaWordCount {
    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: MyJavaWordCount <master> <file>");
            System.exit(1);
        }
        System.out.println("args[0]: <master>="+args[0]);
        System.out.println("args[1]: <file>="+args[1]);

        JavaSparkContext ctx = new JavaSparkContext(
                args[0],
                "MyJavaWordCount",
                System.getenv("SPARK_HOME"),
                System.getenv("SPARK_EXAMPLES_JAR"));
        JavaRDD<String> lines = ctx.textFile(args[1], 1);

//      output                                            input   output         
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            //              output       input 
            public Iterable<String> call(String s) {
                return Arrays.asList(s.split(" "));
            }
        });

//          K       V                                                input   K       V 
        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
            //            K       V             input 
            public Tuple2<String, Integer> call(String s) {
                //                K       V 
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        List<Tuple2<String, Integer>> output = counts.collect();
        for (Tuple2 tuple : output) {
            System.out.println(tuple._1 + ": " + tuple._2);
        }
        System.exit(0);
    }
}
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
user3705662
  • 2,167
  • 3
  • 17
  • 15

3 Answers3

16

If you are using scala-spark this code will help you to adding spark listener.

Create your SparkContext

val sc=new SparkContext(sparkConf) 

Now you can add your spark listener in spark context

sc.addSparkListener(new SparkListener() {
  override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
    println("Spark ApplicationStart: " + applicationStart.appName);
  }

  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
    println("Spark ApplicationEnd: " + applicationEnd.time);
  }

});

Here is the list of Interface for listening to events from the Spark schedule.

Gabber
  • 7,169
  • 3
  • 32
  • 46
10

You should implement SparkListener. Just override whatever events you are interested in (job/stage/task start/end events), then call sc.addSparkListener(myListener).

It does not give you a straight-up percentage-based progress tracker, but at least you can track that progress is being made and its rough rate. The difficulty comes from how unpredictable the number of Spark stages can be, and also how the running times of each stage can be vastly different. The progress within a stage should be more predictable.

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
4

First thing is if you want track progress then you can consider spark.ui.showConsoleProgress pls check @Yijie Shens answer(Spark output: log-style vs progress-style) for this..

I think no need to implement Spark listener for such thing. Unless you are very specific.


Question : How to implement custom job listener/tracker in Spark?

You can Use SparkListener and intercept SparkListener events.

Classic example of this implementation with in Spark Framework it self is HeartBeatReceiver.

Example : HeartBeatReceiver.scala

/**
 * Lives in the driver to receive heartbeats from executors..
 */
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
  extends SparkListener with ThreadSafeRpcEndpoint with Logging {

  def this(sc: SparkContext) {
    this(sc, new SystemClock)
  }

  sc.addSparkListener(this) ...

Below are list of Listener events available. out of which application/job events should be useful for you

  • SparkListenerApplicationStart

  • SparkListenerJobStart

  • SparkListenerStageSubmitted

  • SparkListenerTaskStart

  • SparkListenerTaskGettingResult

  • SparkListenerTaskEnd

  • SparkListenerStageCompleted

  • SparkListenerJobEnd

  • SparkListenerApplicationEnd

  • SparkListenerEnvironmentUpdate

  • SparkListenerBlockManagerAdded

  • SparkListenerBlockManagerRemoved

  • SparkListenerBlockUpdated

  • SparkListenerUnpersistRDD

  • SparkListenerExecutorAdded

  • SparkListenerExecutorRemoved

Community
  • 1
  • 1
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121