2

I am trying to find median in Hadoop. The job is failing with below error:

16/03/02 02:46:13 INFO mapreduce.Job: Task Id : attempt_1456904182817_0001_r_000412_0, Status : FAILED
Error: Java heap space

I went through a lot of posts addressing similar problems, but nothing worked. Also took help from:

Error: Java heap space

I tried the below possible solutions:

  1. Increase Java Heap Size as suggested in the above post.
  2. Increased size of containers by changing below property:

    yarn.scheduler.minimum-allocation-mb to 1024 in yarn-site.xml

  3. Increased number of reducers to bigger value like this:

    job.setNumReduceTasks(1000);

But, nothing of the above worked for me. Hence, I am posting this. I know median is not a suitable job for Hadoop, but can anyone provide any solutions that might help.

    java version "1.8.0_60"
    Hadoop version is 2.x

I have a 10 node cluster with 8 GB RAM on each node and 80 GB hard disk on each node.

Here's the entire code:

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math3.stat.descriptive.rank.Median;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;



public class median_all_keys {


    //Mapper
    public static class map1 extends Mapper<LongWritable,Text,Text,DoubleWritable>{public void map(LongWritable key, Text value, Context context)
            throws IOException,InterruptedException{
        String[] line= value.toString().split(",");
        double col1=Double.parseDouble(line[6]);
        double col2=Double.parseDouble(line[7]);
        context.write(new Text("Key0"+"_"+line[0]+"_"+"Index:6"), new DoubleWritable(col1));
        context.write(new Text("Key0"+"_"+line[0]+"_"+"Index:7"), new DoubleWritable(col2));
        context.write(new Text("Key1"+"_"+line[1]+"_"+"Index:6"), new DoubleWritable(col1));
        context.write(new Text("Key1"+"_"+line[1]+"_"+"Index:7"), new DoubleWritable(col2));
        context.write(new Text("Key2"+"_"+line[2]+"_"+"Index:6"), new DoubleWritable(col1));
        context.write(new Text("Key2"+"_"+line[2]+"_"+"Index:7"), new DoubleWritable(col2));
        context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+"_"+"Index:6"), new DoubleWritable(col1));
        context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+"_"+"Index:7"), new DoubleWritable(col2));
        context.write(new Text("Key1"+"_"+line[1] +","+"key2"+"_"+ line[2]+"_"+"Index:6"), new DoubleWritable(col1));
        context.write(new Text("Key1"+"_"+line[1] +","+"key2"+"_"+ line[2]+"_"+"Index:7"), new DoubleWritable(col2));
        context.write(new Text("Key0"+"_"+line[0] +","+"key2"+"_"+ line[2]+"_"+"Index:6"), new DoubleWritable(col1));
        context.write(new Text("Key0"+"_"+line[0] +","+"key2"+"_"+ line[2]+"_"+"Index:7"), new DoubleWritable(col2));
        context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+","+"key2"+"_"+line[2]+"_"+"Index:6"),new DoubleWritable(col1));
        context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+","+"key2"+"_"+line[2]+"_"+"Index:7"),new DoubleWritable(col2));         
    }
}

//Reducer
    public static class sum_reduce extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
    //  HashMap<String,List<Float>> median_map = new HashMap<String,List<Float>>(); 
        @SuppressWarnings({ "unchecked", "rawtypes" })
        public void reduce(Text key,Iterable<DoubleWritable> value, Context context)
        throws IOException,InterruptedException{
            List<Double> values = new ArrayList<>();
            for (DoubleWritable val: value){
                values.add(val.get());
                }
            double res = calculate(values);
            context.write(key, new DoubleWritable(res));


        }

        public static double calculate(List<Double> values) {
              DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics();
              for (Double value : values) {
               descriptiveStatistics.addValue(value);
              }
              return descriptiveStatistics.getPercentile(50);
             }
    }



    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        Job job = new Job(conf,"Sum for all keys");
        //Driver
        job.setJarByClass(median_all_keys.class);
        //Mapper
        job.setMapperClass(map1.class);
        //Reducer
        job.setReducerClass(sum_reduce.class);
        //job.setCombinerClass(TestCombiner.class);
        //Output key class for Mapper
        job.setMapOutputKeyClass(Text.class);
        //Output value class for Mapper
        job.setMapOutputValueClass(DoubleWritable.class);
        //Output key class for Reducer
        job.setOutputKeyClass(Text.class);
        job.setNumReduceTasks(1000);
        //Output value class from Reducer
        job.setOutputValueClass(DoubleWritable.class);
        //Input Format class
        job.setInputFormatClass(TextInputFormat.class);
        //Final Output Format class
        job.setOutputFormatClass(TextOutputFormat.class);
        //Path variable
        Path path = new Path(args[1]);
        //input/output path
        FileInputFormat.addInputPath(job, new Path(args[0]));
         FileOutputFormat.setOutputPath(job, new Path(args[1]));

         path.getFileSystem(conf).delete(path);
         //exiting the job
         System.exit(job.waitForCompletion(true) ? 0 : 1);


    }

}
Mayank Porwal
  • 33,470
  • 8
  • 37
  • 58
  • I found no problem with the SPECS of Hardware. If you could post your code, someone can help. Make sure you are not using any Java Collection Objects to store some data in mappers/reducers, may cause Java Heap. – srikanth Mar 02 '16 at 10:12
  • @srikanth I've added the code. – Mayank Porwal Mar 02 '16 at 10:22

2 Answers2

1

Try reuse writables: create one DoubleWritable class variable and use .set() to set value to it instead of creating new object every time.

Using array in reducer is also unnecesary, send values straight to your DescriptiveStatistics object.

fi11er
  • 679
  • 5
  • 13
0

Check memory settings for YARN, Map and Reduce tasks as per this article.

enter image description here

Set memory parameters depending on your input data set size.

Key parameters:

YARN

yarn.scheduler.minimum-allocation-mb
yarn.scheduler.maximum-allocation-mb
yarn.nodemanager.vmem-pmem-ratio
yarn.nodemanager.resource.memory.mb

Map Memory

mapreduce.map.java.opts
mapreduce.map.memory.mb

Reduce Memory

mapreduce.reduce.java.opts
mapreduce.reduce.memory.mb

Application Master

yarn.app.mapreduce.am.command-opts
yarn.app.mapreduce.am.resource.mb

Have a look at related SE question:

What is the relation between 'mapreduce.map.memory.mb' and 'mapred.map.child.java.opts' in Apache Hadoop YARN?

Community
  • 1
  • 1
Ravindra babu
  • 37,698
  • 11
  • 250
  • 211