0

I using Hadoop Map/Reduce using Java

Suppose, I have completed a whole map/reduce job. Is there any way I could repeat the whole map/reduce part only, without ending the job. I mean, I DON'T want to use any chaining of the different jobs but only only want the map/reduce part to repeat.

Thank you!

SSaikia_JtheRocker
  • 5,053
  • 1
  • 22
  • 41
  • as you already said it is a "MapReduce Job". So there is no MapReduce without a job (in hadoop) and no Job without MapReduce. What's your problem? – Thomas Jungblut Apr 18 '11 at 13:04
  • Thanks that you replied, but I mean could the run the map() and reduce() function over and over again from the main() function anyhow? I don't want to exit the main() function... – SSaikia_JtheRocker Apr 18 '11 at 18:16
  • so you want to call map and reduce from the code or you just want to wait for the job to finish and let the main method block until then? – Thomas Jungblut Apr 18 '11 at 20:18
  • Sorry for late reply, I was sick. Yeah, I want the job to wait until the first map/reduce process completes. The job should then again invoke the map/reduce process. – SSaikia_JtheRocker Apr 24 '11 at 17:31

1 Answers1

7

So I am more familiar with hadoop streaming APIs but approach should translate to the native APIs.

In my understanding what you are trying to do is run the several iterations of same map() and reduce() operations on the input data.

Lets say your initial map() input data comes from file input.txt and the output file is output + {iteration}.txt (where iteration is loop count, iteration =[0, # of iteration)). In the second invocation of the map()/reduce() your input file is output+{iteration} and output file would become output+{iteration +1}.txt.

Let me know if this is not clear, I can conjure up a quick example and post a link here.

EDIT* So for Java I modified the hadoop wordcount example to run multiple times

package com.rorlig;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountJob {
  public static class TokenizerMapper 
     extends Mapper<Object, Text, Text, IntWritable>{

 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

 public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
   }
 }
}

public static class IntSumReducer 
   extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, 
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
  }
}

public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();

if (args.length != 3) {
  System.err.println("Usage: wordcount <in> <out> <iterations>");
  System.exit(2);
}
int iterations = new Integer(args[2]);
Path inPath = new Path(args[0]);
Path outPath =  null;
for (int i = 0; i<iterations; ++i){
    outPath = new Path(args[1]+i);
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCountJob.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, inPath);
    FileOutputFormat.setOutputPath(job, outPath);
    job.waitForCompletion(true);
    inPath = outPath;
   }
 }
}

Hope this helps

rOrlig
  • 2,489
  • 4
  • 35
  • 48