I am trying to find median in Hadoop. The job is failing with below error:
16/03/02 02:46:13 INFO mapreduce.Job: Task Id : attempt_1456904182817_0001_r_000412_0, Status : FAILED
Error: Java heap space
I went through a lot of posts addressing similar problems, but nothing worked. Also took help from:
I tried the below possible solutions:
- Increase Java Heap Size as suggested in the above post.
Increased size of containers by changing below property:
yarn.scheduler.minimum-allocation-mb to 1024 in yarn-site.xml
Increased number of reducers to bigger value like this:
job.setNumReduceTasks(1000);
But, nothing of the above worked for me. Hence, I am posting this. I know median is not a suitable job for Hadoop, but can anyone provide any solutions that might help.
java version "1.8.0_60"
Hadoop version is 2.x
I have a 10 node cluster with 8 GB RAM on each node and 80 GB hard disk on each node.
Here's the entire code:
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math3.stat.descriptive.rank.Median;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class median_all_keys {
//Mapper
public static class map1 extends Mapper<LongWritable,Text,Text,DoubleWritable>{public void map(LongWritable key, Text value, Context context)
throws IOException,InterruptedException{
String[] line= value.toString().split(",");
double col1=Double.parseDouble(line[6]);
double col2=Double.parseDouble(line[7]);
context.write(new Text("Key0"+"_"+line[0]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key0"+"_"+line[0]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key1"+"_"+line[1]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key1"+"_"+line[1]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key2"+"_"+line[2]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key2"+"_"+line[2]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key1"+"_"+line[1] +","+"key2"+"_"+ line[2]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key1"+"_"+line[1] +","+"key2"+"_"+ line[2]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key0"+"_"+line[0] +","+"key2"+"_"+ line[2]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key0"+"_"+line[0] +","+"key2"+"_"+ line[2]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+","+"key2"+"_"+line[2]+"_"+"Index:6"),new DoubleWritable(col1));
context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+","+"key2"+"_"+line[2]+"_"+"Index:7"),new DoubleWritable(col2));
}
}
//Reducer
public static class sum_reduce extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
// HashMap<String,List<Float>> median_map = new HashMap<String,List<Float>>();
@SuppressWarnings({ "unchecked", "rawtypes" })
public void reduce(Text key,Iterable<DoubleWritable> value, Context context)
throws IOException,InterruptedException{
List<Double> values = new ArrayList<>();
for (DoubleWritable val: value){
values.add(val.get());
}
double res = calculate(values);
context.write(key, new DoubleWritable(res));
}
public static double calculate(List<Double> values) {
DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics();
for (Double value : values) {
descriptiveStatistics.addValue(value);
}
return descriptiveStatistics.getPercentile(50);
}
}
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
Job job = new Job(conf,"Sum for all keys");
//Driver
job.setJarByClass(median_all_keys.class);
//Mapper
job.setMapperClass(map1.class);
//Reducer
job.setReducerClass(sum_reduce.class);
//job.setCombinerClass(TestCombiner.class);
//Output key class for Mapper
job.setMapOutputKeyClass(Text.class);
//Output value class for Mapper
job.setMapOutputValueClass(DoubleWritable.class);
//Output key class for Reducer
job.setOutputKeyClass(Text.class);
job.setNumReduceTasks(1000);
//Output value class from Reducer
job.setOutputValueClass(DoubleWritable.class);
//Input Format class
job.setInputFormatClass(TextInputFormat.class);
//Final Output Format class
job.setOutputFormatClass(TextOutputFormat.class);
//Path variable
Path path = new Path(args[1]);
//input/output path
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
path.getFileSystem(conf).delete(path);
//exiting the job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}