Don't know how much this will help with the word count but I have constructed my own personal mappers and reducers that will process R scripts. Without all the complexities here is how I submit a simple job.
The ArtisanJob is just a class that extends org.apache.hadoop.mapreduce.Job. It has some extra methods for my functionality. You could replace ArtisanJob with just org.apache.hadoop.mapreduce.Job and it should work just fine for you.
My ArtisanConfiguration extends the import org.apache.hadoop.conf.Configuration and could be replaced also with just the import org.apache.hadoop.conf.Configuration.
The MetricInputFormat, MetricOutputFormat are the same they are simple adapters that extend InputFormat and OutputFormat respectively.
Let me know if you have any questions, but this is working code for hadoop 2.4.1 using mrv2.
public String execute(IHadoopJobConfiguration jobDetails)
throws HadoopJobException {
try {
ArtisanJob job = createJob(jobDetails);
job.submit();
return job.getJobID().toString();
} catch (ClassNotFoundException | IOException | InterruptedException
| RAnalyticsException | ConfigurationException e) {
logger.log(Level.SEVERE, "Unable to execute job", e);
throw new HadoopJobException("Unable to execute operation", e);
} catch (Exception e) {
throw new HadoopJobException("Unable to execute operation", e);
}
}
...
ArtisanJob createJob(IHadoopJobConfiguration details)
throws IOException, ConfigurationException, RAnalyticsException {
IOperation mapperOperation = details.getMapperOperation();
IOperation reducerOperation = details.getReducerOperation();
OperationConfiguration conf = new OperationConfiguration();
conf.setDataProviders(details.getDataProviders());
conf.setOperationInputs(details.getUserInputs());
ArtisanJob job = new ArtisanJob(new ArtisanConfiguration());
// Tell the job to be local for right now
job.getConfiguration().set("mapreduce.framework.name", "local");
job.setMapperClass(ROperationMapper.class);
job.setReducerClass(ROperationReducer.class);
job.setInputFormatClass(MetricInputFormat.class);
job.setOutputFormatClass(MetricOutputFormat.class);
job.setMapOutputKeyClass(MetricKey.class);
job.setMapOutputValueClass(MetricWritable.class);
job.setJarByClass(MetricInputFormat.class);
job.getConfiguration()
.set("conf.column",
props.getProperty("com.artisan.orchestrator.hbase.metric.colfamily"));
// Set the output type to hbase so that it will write the outputs to
// our hbase server
MetricOutputFormat.setOutputAdatperType(job.getConfiguration(),
OutputAdapterType.HBASE);
// Set the input to be the http service, this needs to be more modular.
MetricInputFormat.setInputAdapterType(job.getConfiguration(),
InputAdapterType.HTTP);
job.setMapperOperation(mapperOperation);
job.setReducerOperation(reducerOperation);
logger.log(Level.SEVERE, "Job class is " + job.getJar());
return job;
}