0

I tried to distinct 2GB data (JSON type, 1 record is 1 JSON Object) using MapReduce in a cluster of 4 computer. Mapping works just fine, there is 17 map task launch (since my block size is 128MB) but there is only 1 reduce task launch. I'm not setting number of reducer in my code.

Mapper code

public static class DistinctMapper extends
        Mapper<Object, Text, Text, Text> {

    private Text outFieldKey = new Text();
    private String field;

    @Override
    protected void setup(Context context) throws IOException,
            InterruptedException {
        this.field = context.getConfiguration().get("field");
    }

    @Override
    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

        JSONObject parsed = new JSONObject();
        String strValue = value.toString();
        parsed = new JSONObject(strValue);

        String selectedFieldVal = "";
        selectedFieldVal = parsed.get(this.field).toString();

        outFieldKey.set(selectedFieldVal);
        context.write(outFieldKey, value);
    }
}

Combiner Code

public static class DistinctCombiner extends
        Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values,
            Reducer.Context context) throws IOException, InterruptedException {
        context.write(key, values.iterator().next());
    }
}

Reducer Code

 public static class DistinctReducer extends
        Reducer<Text, Text, Text, Text> {

    @Override
    public void reduce(Text key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {
        context.write(new Text(), values.iterator().next());
    }
}

Main

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
            .getRemainingArgs();
    if (otherArgs.length != 3) {
        System.err.println("Usage: DistinctDataBySomeField <field> <in> <out>");
        System.exit(2);
    }

    conf.set("field", otherArgs[0]);
    Job job = new Job(conf, "Distinct data by field x");
    job.setJarByClass(DistinctPatternDriver.class);
    job.setMapperClass(DistinctMapper.class);
    job.setCombinerClass(DistinctCombiner.class);
    job.setReducerClass(DistinctReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

I need many reducer so it can be faster. Why is it happen?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Irvan
  • 165
  • 1
  • 13
  • 1 is the default... But curious why you wouldn't use Spark/Hive/Pig to do the same workload in less code – OneCricketeer Mar 16 '19 at 20:08
  • Omg Lol Thanks. I dont even think there is default number before, everybody just teach it like Hadoop itself that will take care of how many reducer should be used. So if I want many reducer I always have to set it myself? Am I correct? By the way It is just a school assignment and I'm just started to learn this big data things and not familiar with Spark/Hive/Pig yet, maybe after this I will start learning that. @cricket_007 – Irvan Mar 16 '19 at 20:32
  • 1
    The mapper amount is calculated, based on the input size of splits. The reducers are almost always a guess-and-check process, from what I recall, and will need to be manually adjusted over time as data changes – OneCricketeer Mar 16 '19 at 20:44

0 Answers0