12

I need to access the counters from my mapper in my reducer. Is this possible? If so how is it done?

As an example: my mapper is:

public class CounterMapper extends Mapper<Text,Text,Text,Text> {

    static enum TestCounters { TEST }

    @Override
    protected void map(Text key, Text value, Context context)
                    throws IOException, InterruptedException {
        context.getCounter(TestCounters.TEST).increment(1);
        context.write(key, value);
    }
}

My reducer is

public class CounterReducer extends Reducer<Text,Text,Text,LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
                        throws IOException, InterruptedException {
        Counter counter = context.getCounter(CounterMapper.TestCounters.TEST);
        long counterValue = counter.getValue();
        context.write(key, new LongWritable(counterValue));
    }
}

counterValue is always 0. Am I doing something wrong or is this just not possible?

kichik
  • 33,220
  • 7
  • 94
  • 114
asdf
  • 121
  • 1
  • 1
  • 3

6 Answers6

11

In the Reducer's configure(JobConf), you can use the JobConf object to look up the reducer's own job id. With that, your reducer can create its own JobClient -- i.e. a connection to the jobtracker -- and query the counters for this job (or any job for that matter).

// in the Reducer class...
private long mapperCounter;

@Override
public void configure(JobConf conf) {
    JobClient client = new JobClient(conf);
    RunningJob parentJob = 
        client.getJob(JobID.forName( conf.get("mapred.job.id") ));
    mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME);
}

Now you can use mapperCounter inside the reduce() method itself.

You actually need a try-catch here. I'm using the old API, but it shouldn't be hard to adapt for the new API.

Note that mappers' counters should all be finalized before any reducer starts, so contrary to Justin Thomas's comment, I believe you should get accurate values (as long as the reducers aren't incrementing the same counter!)

Jeff G
  • 908
  • 8
  • 18
  • It may seem counter-intuitive that counters from mappers are not available in reducers, but in `Hadoop` reducers can start execution earlier than all the mappers finish. In that event the value of a counter could be read different at different times in reducers. To know more about how can reducers be started earlier than the time mappers finish execution, visit this post: http://stackoverflow.com/questions/11672676/when-do-reduce-tasks-start-in-hadoop – abhinavkulkarni Oct 10 '13 at 19:09
  • 2
    @abhinavkulkarni Actually, **only** the shuffle phase of the reducer can start before all mappers start, which is irrelevant to the counters. So, when the reduce phase of the reducer starts, all mapper counters are correct. From the same post: "On the other hand, sort and reduce can only start once all the mappers are done." – vefthym May 12 '14 at 14:22
9

Implemented Jeff G's solution on the new API:

    @Override
    public void setup(Context context) throws IOException, InterruptedException{
        Configuration conf = context.getConfiguration();
        Cluster cluster = new Cluster(conf);
        Job currentJob = cluster.getJob(context.getJobID());
        mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();  
    }
itzhaki
  • 822
  • 2
  • 7
  • 19
  • 2
    I tried this but I'm getting a java null point exception error at the following line mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME) , where I replaced the COUNTER_NAME with my custom counter – Sumit Das Dec 09 '15 at 05:32
  • It seems that `cluster.getJob(context.getJobID());` does not work in hadoop's Standalone Operation. When running in Single Node Cluster mode this works for me. – dreua Nov 23 '16 at 17:08
  • 1
    Where did you import `Cluster` from? Intellij IDEA suggests me to import `org.apache.commons.math.stat.clustering.Cluster` and nothing else. And this import does not accept `Configuration as constructor's parameter. – scarface Apr 16 '19 at 11:49
2

The whole point of map/reduce is to parallelize the jobs. There will be many unique mappers/reducers so the value wouldn't be correct anyway except for that run of the map/reduce pair.

They have a word count example:

http://wiki.apache.org/hadoop/WordCount

You could change the context.write(word,one) to context.write(line,one)

Justin Thomas
  • 5,680
  • 3
  • 38
  • 63
1

I asked this question, but I haven't solve my problem. However, an alternative solution came to my mind. In mapper, number of words is counted, and it can be written to intermediate output with minimum key(so that this value is in head) in cleanup function which runs a the end of the mapper. In the reducer, number of words is calculating by adding values in head. The sample code and a part of its output is available below.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * Created by tolga on 1/26/16.
 */
public class WordCount {
    static enum TestCounters { TEST }
    public static class Map extends Mapper<Object, Text, Text, LongWritable> {
        private final static LongWritable one = new LongWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                context.write(word, one);
                context.getCounter(TestCounters.TEST).increment(1);
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            context.write(new Text("!"),new LongWritable(context.getCounter(TestCounters.TEST).getValue()));
        }
    }

    public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {

        public void reduce(Text key, Iterable<LongWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (LongWritable val : values) {
                sum += val.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job = new Job(conf, "WordCount");
        job.setJarByClass(WordCount.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

Text File:

Turgut Özal University is a private university located in Ankara, Turkey. It was established in 2008 by the Turgut Özal Thought and Action Foundation and is named after former Turkish president Turgut Özal.

Intermediate Output

**! 33**
2008 1
Action 1
Ankara, 1
Foundation 1
It 1
Thought 1
Turgut 1
Turgut 1
Turgut 1

**! 33**
2008 1
Action 1
Ankara, 1
Foundation 1
It 1
Thought 1
Turgut 3
Community
  • 1
  • 1
tolgabuyuktanir
  • 646
  • 6
  • 20
1

The global counter values are never broadcast back to each mapper or reducer. If you want the # of mapper records to be available to the reducer, you'll need to rely on some external mechanism to do this.

bajafresh4life
  • 12,491
  • 5
  • 37
  • 46
0

Improvement from itzhaki's answer

findCounter(COUNTER_NAME) is no longer supported - https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/Counters.html

@Override
public void setup(Context context) throws IOException, InterruptedException{
    Configuration conf = context.getConfiguration();
    Cluster cluster = new Cluster(conf);
    Job currentJob = cluster.getJob(context.getJobID());
    mapperCounter = currentJob.getCounters().findCounter(GROUP_NAME, COUNTER_NAME).getValue();  
}

GROUP_NAME is specified, when the counter is invoked. e.g.

context.getCounter("com.example.mycode", "MY_COUNTER").increment(1);

then

mapperCounter = currentJob.getCounters().findCounter("com.example.mycode", "MY_COUNTER").getValue();  

Also, one important point that, if the counter does not exist it will initialize one with value 0.

Gyanendra Dwivedi
  • 5,511
  • 2
  • 27
  • 53