I currently have two hadoop jobs where the second job requires output of the first to be added to the distributed cache. currently I run them manually, so after the first job is finished, I pass in the output file as an argument to the second job and its driver adds it to the cache.
The first job is just a simple map only job, and I was hoping that I could run one command when performed both jobs in sequence.
Can anyone help me out with the code to get the output of the first job put into the distributed cache so that it can be passed into the second job?
Thanks
Edit: This is the current driver for job 1:
public class PlaceDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: PlaceMapper <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Place Mapper");
job.setJarByClass(PlaceDriver.class);
job.setMapperClass(PlaceMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
TextInputFormat.addInputPath(job, new Path(otherArgs[0]));
TextOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
This is the driver for job2. The output of job 1 gets passed to job 2 as the first argument and loaded into the cache
public class LocalityDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: LocalityDriver <cache> <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Job Name Here");
DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(),job.getConfiguration());
job.setNumReduceTasks(1); //TODO: Will change
job.setJarByClass(LocalityDriver.class);
job.setMapperClass(LocalityMapper.class);
job.setCombinerClass(TopReducer.class);
job.setReducerClass(TopReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
TextInputFormat.addInputPath(job, new Path(otherArgs[1]));
TextOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}