I tried to distinct 2GB data (JSON type, 1 record is 1 JSON Object) using MapReduce in a cluster of 4 computer. Mapping works just fine, there is 17 map task launch (since my block size is 128MB) but there is only 1 reduce task launch. I'm not setting number of reducer in my code.
Mapper code
public static class DistinctMapper extends
Mapper<Object, Text, Text, Text> {
private Text outFieldKey = new Text();
private String field;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
this.field = context.getConfiguration().get("field");
}
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
JSONObject parsed = new JSONObject();
String strValue = value.toString();
parsed = new JSONObject(strValue);
String selectedFieldVal = "";
selectedFieldVal = parsed.get(this.field).toString();
outFieldKey.set(selectedFieldVal);
context.write(outFieldKey, value);
}
}
Combiner Code
public static class DistinctCombiner extends
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values,
Reducer.Context context) throws IOException, InterruptedException {
context.write(key, values.iterator().next());
}
}
Reducer Code
public static class DistinctReducer extends
Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
context.write(new Text(), values.iterator().next());
}
}
Main
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: DistinctDataBySomeField <field> <in> <out>");
System.exit(2);
}
conf.set("field", otherArgs[0]);
Job job = new Job(conf, "Distinct data by field x");
job.setJarByClass(DistinctPatternDriver.class);
job.setMapperClass(DistinctMapper.class);
job.setCombinerClass(DistinctCombiner.class);
job.setReducerClass(DistinctReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
I need many reducer so it can be faster. Why is it happen?