As it is quite obvious from title, my aim is to use the Mapper's counter in the reduce phase, before finishing the particular job.
I have come across a few questions which were highly related to this question, but non of them solved all my problems. (Accessing a mapper's counter from a reducer, Hadoop, MapReduce Custom Java Counters Exception in thread "main" java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING, etc.)
@Override
public void setup(Context context) throws IOException, InterruptedException{
Configuration conf = context.getConfiguration();
Cluster cluster = new Cluster(conf);
Job currentJob = cluster.getJob(context.getJobID());
mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();
}
My problem is that the cluster does not contain any job history.
The way how I call the mapreduce job:
private void firstFrequents(String outpath) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Cluster cluster = new Cluster(conf);
conf.setInt("minFreq", MIN_FREQUENCY);
Job job = Job.getInstance(conf, "APR");
// Counters counters = job.getCounters();
job.setJobName("TotalTransactions");
job.setJarByClass(AssociationRules.class);
job.setMapperClass(FirstFrequentsMapper.class);
job.setReducerClass(CandidateReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("input"));
FileOutputFormat.setOutputPath(job, new Path(outpath));
job.waitForCompletion(true);
}
Mapper:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FirstFrequentsMapper extends
Mapper<Object, Text, Text, IntWritable> {
public enum Counters {
TotalTransactions
}
private IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split("\\\t+|,+");
int iter = 0;
for (String string : line) {
context.write(new Text(line[iter]), one);
iter++;
}
context.getCounter(Counters.TotalTransactions).increment(1);
}
}
Reducer
public class CandidateReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private int minFrequency;
private long totalTransactions;
@Override
public void setup(Context context) throws IOException, InterruptedException{
Configuration conf = context.getConfiguration();
minFrequency = conf.getInt("minFreq", 1);
Cluster cluster = new Cluster(conf);
Job currentJob = cluster.getJob(context.getJobID());
totalTransactions = currentJob.getCounters().findCounter(FirstFrequentsMapper.Counters.TotalTransactions).getValue();
System.out.print(totalTransactions);
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {
int counter = 0;
for (IntWritable val : values) {
counter+=val.get();
}
/* Item frequency calculated*/
/* Write it to output if it is frequent */
if (counter>= minFrequency) {
context.write(key,new IntWritable(counter));
}
}
}