0

I have the following Reducer class (part of a MapReduce job) that's supposed to compute a score = POS /(-1*sum(NEGs)).

where POS is one positive number, and NEGs are 2 negative numbers. It's always this way.

For example, if the input from the mapper is:

<A, A>  -15.0
<A, A>  2.0
<A, A>  -15.0

The expected output would be:

<A, A>  0.06666666666666667

However, it's outputting infinity for every output record!

<A, A>  Infinity

While debugging, if I added statement to emit values inside the while loop:

score.set(val);
context.write(key, score);

, it prints the results fine but repeats the division. So I get the following:

<A, A>  -15.0
<A, A>  2.0
<A, A>  -15.0
<A, A>  0.06666666666666667   # correct calculation (2/30)
<A, A>  0.0022222222222222222 # Not sure why it divids twice by 30 (2/30/30)!!

This is MyReducer class

private static class MyReducer extends
        Reducer<Pair, DoubleWritable, Pair, DoubleWritable> {
    private DoubleWritable score = new DoubleWritable();
    int counter = 0;

    @Override
    public void reduce(Pair key, Iterable<DoubleWritable> values, Context context)
            throws IOException, InterruptedException {
        Iterator<DoubleWritable> iter = values.iterator();
        double nor = 0.0;
        double don = 0.0;

        double val;
        while (iter.hasNext()) {
            val = iter.next().get();
            if (val < 0)
                don += val*-1;
            else
                nor = val;
            //uncomment for debugging!
            //score.set(val);
            //context.write(key, score);
        }

        score.set(nor / don);
        context.write(key, score);
    }

Can anyone explain why it

  • emits infinity if I didn't emit anything inside the while loop
  • divides by the denominator twice?

Thanks!

sareem
  • 429
  • 1
  • 8
  • 23

1 Answers1

0

Doubles acting funny in Java is far from rare, of course, but in this particular case, it's not the weird ways of doubles, as for how compatible they can be in Hadoop terms.

First and foremost, this type of reduce computation is critical to only be used at the Reduce stage of the job and not on the Combine stage (if any). In case you have set this reduce computation to be also implemented as a combiner, you could consider un-setting this setup. This is not so much of a rule of thumb, but there's been a lot of bugs in MapReduce jobs where one can't quite figure out why the reducers get weird data or have computations being executed twice in a row (just like you have pointed out).

However, the possible culprit of the issue is the fact that in order to have safe double-type divisions, you really need to use type casting to have a proper double-type result.

To showcase this, I used an example of input based on your input data and stored in an \input directory. Every unique key has one positive and two negative numbers as values (here the keys are set as String for the sake of simplicity), as shown below:

Α -15.0
Α 2.0
Α -15.0
Β -10.0
Β 9.0
Β -12.0
C -7.0
C 1.0
C -19.0
D -5.0
D 18.0
D -5.0
E -6.0
E 6.0
E -6.0

Then explicit type casting was used for the calculation of each score, as you can see from the code below:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;


public class ScoreComp 
{
    /* input:  <Character, Number>
     * output: <Character, Number>
     */
    public static class Map extends Mapper<Object, Text, Text, DoubleWritable> 
    {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        {
            String record = value.toString();
            String[] parts = record.split(" "); // just split the lines into key and value

            // create key-value pairs from each line
            context.write(new Text(parts[0]), new DoubleWritable(Double.parseDouble(parts[1])));
        }
    }

    /* input:  <Character, Number>
     * output: <Character, Score>
     */
    public static class Reduce extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
    {
        public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException 
        {
            double pos = 0.0;
            double neg = 0.0;

            // for every value of a unique key...
            for(DoubleWritable value : values)
            {
                // retrieve the positive number and calculate the sum of the two negative numbers
                if(value.get() < 0)
                    neg += value.get();
                else
                    pos = value.get();
            }

            // calculate the score based on the values of each key (using explicit type casting)
            double result = (double) pos / (-1 * neg);

            // create key-value pairs for each key with its score
            context.write(key, new DoubleWritable(result));
        }
    }


    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("input");
        Path output_dir = new Path("scores");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job scorecomp_job = Job.getInstance(conf, "Score Computation");
        scorecomp_job.setJarByClass(ScoreComp.class);
        scorecomp_job.setMapperClass(Map.class);
        scorecomp_job.setReducerClass(Reduce.class);    
        scorecomp_job.setMapOutputKeyClass(Text.class);
        scorecomp_job.setMapOutputValueClass(DoubleWritable.class);
        scorecomp_job.setOutputKeyClass(Text.class);
        scorecomp_job.setOutputValueClass(DoubleWritable.class);
        FileInputFormat.addInputPath(scorecomp_job, input_dir);
        FileOutputFormat.setOutputPath(scorecomp_job, output_dir);
        scorecomp_job.waitForCompletion(true);
    }
}

And you can see the results from the MapReduce job in the /scores directory are making sense math-wise (screenshot taken through the HDFS browsing explorer): enter image description here

Coursal
  • 1,387
  • 4
  • 17
  • 32