You need a second mapReduce
job, Where the input is the output of your first job.
I have tweaked the code to make it work as per your wish.
For Input
#USA 2
#Holy 5
#SOS 3
#Love 66
The output should be
66 #Love
5 #Holy
3 #SOS
2 #USA
I have assumed that tab is delimited between hashtag and count. If it is something else, please change that. The code is not tested, please let me know if it works.
package com.my.cert.example;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class ValueSortExp {
public static void main(String[] args) throws Exception {
Path inputPath = new Path("C:\\hadoop\\test\\test.txt");
Path outputDir = new Path("C:\\hadoop\\test\\test1");
// Path inputPath = new Path(args[0]);
// Path outputDir = new Path(args[1]);
// Create configuration
Configuration conf = new Configuration(true);
// Create job
Job job = new Job(conf, "Test HIVE commond");
job.setJarByClass(ValueSortExp.class);
// Setup MapReduce
job.setMapperClass(ValueSortExp.MapTask.class);
job.setReducerClass(ValueSortExp.ReduceTask.class);
job.setNumReduceTasks(1);
// Specify key / value
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(IntComparator.class);
// Input
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
/*
* // Delete output if exists FileSystem hdfs = FileSystem.get(conf); if
* (hdfs.exists(outputDir)) hdfs.delete(outputDir, true);
*
* // Execute job int code = job.waitForCompletion(true) ? 0 : 1;
* System.exit(code);
*/
// Execute job
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
public static class IntComparator extends WritableComparator {
public IntComparator() {
super(IntWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();
return v1.compareTo(v2) * (-1);
}
}
public static class MapTask extends
Mapper<LongWritable, Text, IntWritable, IntWritable> {
public void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split("\t"); // This is the delimiter between Key and Value
int valuePart = Integer.parseInt(tokens[1]);
context.write(new IntWritable(valuePart), new Text(tokens[0]));
}
}
public static class ReduceTask extends
Reducer<IntWritable, Text, Text, IntWritable> {
public void reduce(IntWritable key, Iterable<Text> list, Context context)
throws java.io.IOException, InterruptedException {
for (Text value : list) {
context.write(value,key);
}
}
}
}