0

As it is quite obvious from title, my aim is to use the Mapper's counter in the reduce phase, before finishing the particular job.

I have come across a few questions which were highly related to this question, but non of them solved all my problems. (Accessing a mapper's counter from a reducer, Hadoop, MapReduce Custom Java Counters Exception in thread "main" java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING, etc.)

    @Override
public void setup(Context context) throws IOException, InterruptedException{
    Configuration conf = context.getConfiguration();
    Cluster cluster = new Cluster(conf);
    Job currentJob = cluster.getJob(context.getJobID());
    mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();  
}

My problem is that the cluster does not contain any job history.

The way how I call the mapreduce job:

private void firstFrequents(String outpath) throws IOException,
        InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Cluster cluster = new Cluster(conf);
        conf.setInt("minFreq", MIN_FREQUENCY);
        Job job = Job.getInstance(conf, "APR");
        // Counters counters = job.getCounters();
        job.setJobName("TotalTransactions");
        job.setJarByClass(AssociationRules.class);
        job.setMapperClass(FirstFrequentsMapper.class);
        job.setReducerClass(CandidateReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path("input"));
        FileOutputFormat.setOutputPath(job, new Path(outpath));


        job.waitForCompletion(true);
    }

Mapper:

    import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FirstFrequentsMapper extends
        Mapper<Object, Text, Text, IntWritable> {
    public enum Counters {
        TotalTransactions
    }

    private IntWritable one = new IntWritable(1);

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] line = value.toString().split("\\\t+|,+");
        int iter = 0;
        for (String string : line) {
            context.write(new Text(line[iter]), one);
            iter++;
        }
        context.getCounter(Counters.TotalTransactions).increment(1);

    }
    }

Reducer

    public class CandidateReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    private int minFrequency;
    private long totalTransactions;

    @Override
    public void setup(Context context) throws IOException, InterruptedException{
        Configuration conf = context.getConfiguration();
        minFrequency = conf.getInt("minFreq", 1);    
       Cluster cluster = new Cluster(conf);
        Job currentJob = cluster.getJob(context.getJobID());
        totalTransactions = currentJob.getCounters().findCounter(FirstFrequentsMapper.Counters.TotalTransactions).getValue();  
        System.out.print(totalTransactions);
    }


    public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {
        int counter = 0;
        for (IntWritable val : values) {
            counter+=val.get();
        }

        /* Item frequency calculated*/
        /* Write it to output if it is frequent */
        if (counter>= minFrequency) {
            context.write(key,new IntWritable(counter));
        }
    }


}
Community
  • 1
  • 1
ponthu
  • 311
  • 1
  • 3
  • 14
  • how is this code similar to the code suggested in http://stackoverflow.com/questions/5450290/accessing-a-mappers-counter-from-a-reducer ? I don't see any Cluster instance there... And why would you need the job history, when you are using the code within a single job? – vefthym May 20 '16 at 06:53
  • @vefthym I have also mentioned this stackoverflow post. To see the Cluster part, scroll down to "Implemented Jeff G's solution on the new API:" post. My mapper and counter look similar, with the totalTransaction enum counter. I though that the current ongoing job is also stored in job history, and I may access it from there. Anyway, I am open to any other kind of solutions - please let me know if you have any idea. – ponthu May 20 '16 at 08:15
  • I know that you mention it in your post, that's why I am asking why it didn't work. I have used this code successfully in the past. Did you get any error? – vefthym May 20 '16 at 08:20
  • The way I invoke mapreduce: https://www.dropbox.com/s/gi4dhs6anfrpyyk/invokingMapReduce.PNG?dl=0 If i uncomment the getcounter()findcounter() part, the program simply stops after reaching that line, without any error message https://www.dropbox.com/s/n6r21chqw4hzijq/uncommented.PNG?dl=0 – ponthu May 20 '16 at 08:31
  • I guess this happens because you do nothing with the value of this counter, e.g., store it in a variable, or print it... Please post this code in your question, instead of inside the comments. – vefthym May 20 '16 at 08:35
  • I would like to store it in a long variable, as you can see the last row. Also, now i tried to add a print line, but no change – ponthu May 20 '16 at 08:36
  • why do you need a Cluster variable? perhaps you confuse it with `context`, which is suggested in the related link? – vefthym May 20 '16 at 08:38

1 Answers1

2

The correct setup(), or reduce() implementation, to get the value of a counter, is exactly the one shown in the post that you mention:

Counter counter = context.getCounter(CounterMapper.TestCounters.TEST);
long counterValue = counter.getValue();

where TEST is the name of the counter, which is declared in an enum TestCounters.

I don't see the reason why you declare a Cluster variable...

Also, in the code that you mention in your comments, you should store the returned result of the getValue() method in a variable, as the counterValue variable above.

Perhaps, you will find this post useful, as well.

UPDATE: Based on your edit, I believe that all you are looking for is the number of MAP_INPUT_RECORDS which is a default counter, so you don't need to re-implement it.

To get the value of a counter from the Driver class, you can use (taken from this post):

job.getCounters().findCounter(COUNTER_NAME).getValue(); 
Community
  • 1
  • 1
vefthym
  • 7,422
  • 6
  • 32
  • 58
  • The problem with this solution, that I face with the same problem as the author of the linked post - returns 0 every single time. After the job completed, I can retrieve the real value of the counter (which is not 0, so the mapper increments it), although I would like it during the job. – ponthu May 20 '16 at 08:56
  • @verfthym My interest is not in the number of mappers, is more about the number of input rows (no of transactions) as I would like to use this value to eliminate a few key,value pairs during the reduce process. OFF: having a counter variable in the driver, and calling a setMethod() in the mapper to increment the value would work with one node locally, but in real life that's not teh way to go. Can you confirm that? – ponthu May 20 '16 at 09:51
  • @ponthu no, a counter has the aggregated value from all mappers across all nodes, not just one. also, what you describe seems to be then the number of MAP_INPUT_RECORS, which is also a default counter – vefthym May 20 '16 at 18:48
  • @verfthm: Yes, for me the MAP_INPUT_RECORDS would be as good as the custom one. Although, my problem is that i do not know how to access it in the NEW API. I've gone through so many forums, and articles.. if you could past a working version I would be really grateful! – ponthu May 21 '16 at 10:41
  • @ponthu see if my updated answer helps. Otherwise, my last suggestion would be to see if this post is helpful (it was for me in the reducer class): http://stackoverflow.com/questions/13609028/is-there-a-list-of-all-standard-not-custom-hadoop-mapreduce-counters – vefthym May 21 '16 at 18:57
  • The problem is, when I access the custom counter/or MAP_INPUT_RECORDS, they both returns 0 in the reducer class(during the job). After the job has finished (job.waitForCompletion(true);), both counters return the same (as expected) and real value in the driver class. Reducer: Counter counter = context.getCounter(FirstFrequentsMapper.Counters.TotalTransactions); Counter counter2 = context.getCounter("org.apache.hadoop.mapred.Task$Counter","MAP_INPUT_RECORDS"); long counterValue = counter.getValue(); long counter2Value = counter.getValue(); – ponthu May 22 '16 at 20:30