9

I need to chain two MapReduce jobs. I used JobControl to set job2 as dependent of job1. It works, output files are created!! But it doesn't stop! In the shell it remains in this state:

12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1

How can I stop it? This is my main.

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Configuration conf2 = new Configuration();

    Job job1 = new Job(conf, "canzoni");
    job1.setJarByClass(CanzoniOrdinate.class);
    job1.setMapperClass(CanzoniMapper.class);
    job1.setReducerClass(CanzoniReducer.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    ControlledJob cJob1 = new ControlledJob(conf);
    cJob1.setJob(job1);
    FileInputFormat.addInputPath(job1, new Path(args[0]));
    FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp"));


    Job job2 = new Job(conf2, "songsort");
    job2.setJarByClass(CanzoniOrdinate.class);
    job2.setMapperClass(CanzoniSorterMapper.class);
    job2.setSortComparatorClass(ReverseOrder.class);
    job2.setInputFormatClass(KeyValueTextInputFormat.class);
    job2.setReducerClass(CanzoniSorterReducer.class);
    job2.setMapOutputKeyClass(IntWritable.class);
    job2.setMapOutputValueClass(Text.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    ControlledJob cJob2 = new ControlledJob(conf2);
    cJob2.setJob(job2);
    FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*"));
    FileOutputFormat.setOutputPath(job2, new Path(args[1]));

    JobControl jobctrl = new JobControl("jobctrl");
    jobctrl.addJob(cJob1);
    jobctrl.addJob(cJob2);
    cJob2.addDependingJob(cJob1);
    jobctrl.run();


    ////////////////
    // NEW CODE ///   
    //////////////


    // delete jobctrl.run();
    Thread t = new Thread(jobctrl);
    t.start();
    String oldStatusJ1 = null;
    String oldStatusJ2 = null;
    while (!jobctrl.allFinished()) {
      String status =cJob1.toString();
      String status2 =cJob2.toString();
      if (!status.equals(oldStatusJ1)) {
        System.out.println(status);
        oldStatusJ1 = status;
      }
      if (!status2.equals(oldStatusJ2)) {
        System.out.println(status2);
        oldStatusJ2 = status2;
      }     
     }
    System.exit(0);

} }

Pietro Luciani
  • 247
  • 1
  • 4
  • 14
  • 1
    I solved it using a Thread to start JobControl. I checked that jobs have been done using a while cycle: while(!jobctrl.allFinished()) and a System.exit() out the cycle. Now I would like that jobs return information messages, all I obtained is to know which job is in running, with ControlledJob.toString(). I don't know how get information messages as: number of mapper task, number of reduce task, records in input or in output etc... any idea to get these messages? – Pietro Luciani Sep 12 '12 at 07:41
  • Is "job.getCounters().toString()" enough? – zsxwing May 27 '13 at 09:09
  • Is this a bug in JobControl class? – Rags Aug 28 '13 at 13:25
  • This is actually really useful just to see a working example of how to chain MR jobs together. Couldn't find a clear explanation on this anywhere! – Austin A Mar 03 '15 at 23:18
  • @PietroLuciani Also, for the less experienced, seeing the code you used to stop your job would be really helpful. – Austin A Mar 08 '15 at 01:50
  • 1
    Hi, i wrote code two years ago, this evening or tomorrow i'll try to research code. – Pietro Luciani Mar 18 '15 at 09:54
  • 1
    Hi, @Austin A, i edit my message with code requested by you. – Pietro Luciani Mar 19 '15 at 10:32
  • Thanks @PietroLuciani, I haven't done a whole lot with threading so this is really helpful to me! – Austin A Mar 19 '15 at 14:53

4 Answers4

6

I essentially did what Pietro alluded to above.

public class JobRunner implements Runnable {
  private JobControl control;

  public JobRunner(JobControl _control) {
    this.control = _control;
  }

  public void run() {
    this.control.run();
  }
}

and in my map/reduce class I have:

public void handleRun(JobControl control) throws InterruptedException {
    JobRunner runner = new JobRunner(control);
    Thread t = new Thread(runner);
    t.start();

    while (!control.allFinished()) {
        System.out.println("Still running...");
        Thread.sleep(5000);
    }
}

in which I just pass the jobControl object.

sinemetu1
  • 1,726
  • 1
  • 13
  • 24
3

The JobControl object itself is Runnable, so you can just use it like this:

new Thread(myJobControlInstance).start()
tombrown52
  • 482
  • 4
  • 13
0

Just a tweak to the code snippet what sinemetu1 had shared..

You can drop call to the JobRunner as JobControl by itself implements Runnable

        Thread thread = new Thread(jobControl);
        thread.start();

        while (!jobControl.allFinished()) {
            System.out.println("Still running...");
            Thread.sleep(5000);
        }

I also stumbled upon this link where the user confirms that JobControl can be run ONLY with new thread. https://www.mail-archive.com/common-user@hadoop.apache.org/msg00556.html

shiva
  • 167
  • 1
  • 1
  • 10
0

try this:

    Thread jcThread = new Thread(jobControl);
    jcThread.start();
    System.out.println("循环判断jobControl运行状态 >>>>>>>>>>>>>>>>");
    while (true) {
        if (jobControl.allFinished()) {
        System.out.println("====>> jobControl.allFinished=" + jobControl.getSuccessfulJobList());
        jobControl.stop();
        // 如果不加 break 或者 return,程序会一直循环
        break;
    }

    if (jobControl.getFailedJobList().size() > 0) {
        succ = 0;
        System.out.println("====>> jobControl.getFailedJobList=" + jobControl.getFailedJobList());
        jobControl.stop();

        // 如果不加 break 或者 return,程序会一直循环
        break;
    }
}
dolphinZhang
  • 83
  • 1
  • 8