2

I am trying a simple example, in which the output of one MapReduce job should be the input of another MapReduce job.

The flow should be like this: Mapper1 --> Reducer1 --> Mapper2 --> Reducer2 (The output of Mapper1 must be the input of Reducer1. The output of Reducer1 must be the input of Mapper2. The output of Mapper2 must be the input of Reducer2. The output of Reducer2 must be stored in output file).

How can I add multiple Mappers and Reducers to my program such that the flow is maintained like above?

Do I need to use Chain Mappers or Chain Reducers? If so how can I use them?

MChirukuri
  • 610
  • 6
  • 29

2 Answers2

1

You need to implement two separate MapReduce jobs for that. The result of the first job needs to be written to some persistent storage (like HDFS) and will be read by the second job. The SequenceOutputFormat/InputFormat is often used for that. Both MapReduce jobs can be executed from the same driver program.

Fabian Hueske
  • 18,707
  • 2
  • 44
  • 49
1

I guess what you are looking for is ControlledJob and JobControl. It aptly fits your purpose. In a single Driver class you can build multiple jobs which have dependencies on each other. Following code might help you understand.

    Job jobOne = Job(jobOneConf, "Job-1");
    FileInputFormat.addInputPath(jobOne, jobOneInput);
    FileOutputFormat.setOutputPath(jobOne, jobOneOutput);
    ControlledJob jobOneControl = new ControlledJob(jobOneConf);
    jobOneControl.setJob(jobOne);

    Job jobTwo = Job(jobTwoConf, "Job-2");
    FileInputFormat.addInputPath(jobTwo, jobOneOutput); // here we set the job-1's output as job-2's input
    FileOutputFormat.setOutputPath(jobTwo, jobTwoOutput); // final output
    ControlledJob jobTwoControl = new ControlledJob(jobTwoConf);
    jobTwoControl.setJob(jobTwo);

    JobControl jobControl = new JobControl("job-control");
    jobControl.add(jobOneControl);
    jobControl.add(jobTwoControl);
    jobTwoControl.addDependingJob(jobOneControl); // this condition makes the job-2 wait until job-1 is done

    Thread jobControlThread = new Thread(jobControl);
    jobControlThread.start();
    jobControlThread.join(); 

    /* The jobControl.allFinished() can also be used to wait until all jobs are done */
ViKiG
  • 764
  • 9
  • 21
  • how to get the return value of the `run` method defined in `Tools` interface if using the above code? – Jing He Sep 26 '17 at 06:47
  • 1
    Usually, you use a `ToolRunner` which will run the implemented `run` method. An example code is mentioned in the Hadoop documentation: http://hadoop.apache.org/docs/current/api/org/apache/hadoop/util/Tool.html. Have a look in there. – ViKiG Sep 26 '17 at 07:27
  • Thanks. I know this point. But how to specify the mapper and reducer classes for the two jobs respectively? I tried the following code but it didn't work: `jobOne.setMapperClass(Top10kMapper.class); jobTwo.setMapperClass(Top10kSortMapper.class)` – Jing He Sep 26 '17 at 08:57
  • 1
    @JingHe The `setMapperClass` should be called on `jobOneConf` not `jobOne`. – ViKiG Sep 26 '17 at 09:52
  • @JingHe `jobOneConf` is made of `JobConf` class. Have a look at this: https://hadoop.apache.org/docs/r2.7.3/api/index.html?overview-summary.html – ViKiG Sep 26 '17 at 19:43