3

I want to sort the output of my reducer. A sample of my reducer output is shown below:

0,0    2.5
0,1    3.0
1,0    4.0
1,1    1.5

The reducer output is obviously sorted by first element of the key. But I wanted to sort it by the second element of the key so that the output is this:

0,0    2.5
1,0    4.0
0,1    3.0
1,1    1.5

Any way I can do this?

Please help!

This is my reducer:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class RecReduce extends
Reducer<Text, Text, Text, Text> {
    public static int n=0;
    @Override
    public void setup(Context context) throws IOException, InterruptedException{
        FileSystem hdfs= FileSystem.get(context.getConfiguration());
        BufferedReader br = new BufferedReader(new InputStreamReader(hdfs.open(new Path(context.getConfiguration().get("outFile")))));
        String line=null;
        while((line=br.readLine())!=null){
            n=Integer.parseInt(line);
            break;
        }
        br.close();
        hdfs.close();
    }
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        String[] value;
        HashMap<Integer, Float> hashA = new HashMap<Integer, Float>();
        HashMap<Integer, Float> hashB = new HashMap<Integer, Float>();
        for (Text val : values) {
            value = val.toString().split(",");
            if (value[0].equals("A")) {
                for(int z=1;z<=n;z++){
                    hashA.put(z, Float.parseFloat(value[z]));}
            } else{
                for(int a=1;a<=n;a++){
                    hashB.put(a, Float.parseFloat(value[a]));}
            }
        }
        float result = 0.0f;
        float a_ij;
        float b_jk;
        for (int j=1;j<=n;j++) {
            a_ij = hashA.containsKey(j) ? hashA.get(j) : 0.0f;
            b_jk = hashB.containsKey(j) ? hashB.get(j) : 0.0f;
            result +=a_ij*b_jk;
        }
        context.write(null, new Text(key.toString() + "," + Float.toString(result)));
    }
}
Punit Naik
  • 515
  • 7
  • 26

1 Answers1

-1

you can use the composite key and the composite key comparator

create a class e.g.

class Pair(){
    String key
    String value;
}

and use it in your reducer output like this

context.write(new Pair(key.toString(), Float.toString(result)), null);

then create a comparator

public class PairComparator extends WritableComparator {
    protected PairComparator() {
        super(Pair.class, true);
    }   
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        Pair k1 = (Pair)w1;
        Pair k2 = (Pair)w2;         
        return  k1.getValue().compareTo(k2.getValue());
    }
}

and then use the comparator in your job definition
job.setSortComparatorClass(PairComparator.class);

I didn't check the code above. It just the idea.

I hope it will help

Dmitry Zaytsev
  • 182
  • 1
  • 9
  • 2
    The `job.setSortComparatorClass(PairComparator.class)` is called after map-phase and before reduce-phase right. I don't want to do this as I have only one value per reducer and therefore the program gets stuck after mapping. What I want is to sort after reduction is complete. – Punit Naik Jun 26 '15 at 05:52