I am getting this exception Spark Version:2.0.2
17/05/22 13:47:30 ERROR Executor: Exception in task 0.3 in stage 28.0 (TID 33)
java.lang.NullPointerException
at com.baesystems.ai.analytics.util.RDDUtil.decideBin(RDDUtil.java:47)
at com.baesystems.ai.analytics.util.RDDUtil.access$400(RDDUtil.java:19)
at com.baesystems.ai.analytics.util.RDDUtil$1.call(RDDUtil.java:129)
at com.baesystems.ai.analytics.util.RDDUtil$1.call(RDDUtil.java:102)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1028)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1765)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/05/22 13:47:30 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
17/05/22 13:47:30 INFO MemoryStore: MemoryStore cleared
ver commanded a shutdown
Please guide me as to what is going on here. This code works fine in my IDE(Intellj); This problem occurs only when i try to run this code on Spark Cluster in Standalone mode.
Here is the actual code:
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.rdd.RDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RDDUtil implements Serializable
{
/**
*
*/
private static final long serialVersionUID = 1914105980520925932L;
private static final Logger log = LoggerFactory.getLogger(RDDUtil.class);
public static Map<String, java.util.HashMap<String, Integer>> histoMap = new java.util.HashMap<String, java.util.HashMap<String, Integer>>();
private static HashMap<String, Integer> histogram0;
private static java.util.HashMap<String, Integer> histogram1;
private static java.util.HashMap<String, Integer> histogramBase;
private static int minValue=0;
private static int maxValue=0;
public static int getMinValue() {
return minValue;
}
public static int getMaxValue() {
return maxValue;
}
private static void decideBin(Double label, Double bin)
{
int vCount=0;
log.error("this value of bin is {} and the label is {}",bin,label);
histogramBase.put(bin.toString(), 0);
if(label==0.0) {
assignZero(histogram1, bin.toString());
if(!checkIfPresent(histogram0,bin.toString())) {
vCount++;
histogram0.put(bin.toString(),vCount);
}
}
else {
assignZero(histogram0, bin.toString());
if(!checkIfPresent(histogram1,bin.toString())) {
vCount++;
histogram1.put(bin.toString(),vCount);
}
}
}
private static boolean checkIfPresent(java.util.HashMap<String, Integer> histogram, String bin)
{
if(histogram.containsKey(bin)) {
int value = histogram.get(bin);
value++;
histogram.put(bin, value);
return true;
}
else return false;
}
private static void assignZero(java.util.HashMap<String, Integer> histogram, String bin)
{
if(!histogram.containsKey(bin)) {
histogram.put(bin, 0);
}
}
private static void calculateMin(Double bin)
{
int tempValue = bin.intValue();
if(minValue>tempValue) minValue=tempValue;
}
private static void calculateMax(Double bin)
{
int tempValue = bin.intValue();
if(tempValue>maxValue) maxValue=tempValue;
}
private static JavaRDD<Map<String, HashMap<String, Integer>>> getJavaRDDMap(RDD<LabeledPoint> rdd,int i)
{
long val = rdd.count();
return rdd.toJavaRDD().map(new Function<LabeledPoint,Map<String, HashMap<String, Integer>>>() {
private static final long serialVersionUID = -7619643561853615982L;
@Override
public Map<String, HashMap<String, Integer>> call(LabeledPoint p) {
Double label = (Double)p.productElement(0);
Vector v = (Vector)p.productElement(1);
p.features();
Double bin =0.0;
int vSize =p.features().size();
if(i<vSize) {
bin = v.apply(i);
}
else {
bin = v.apply(vSize-1);
}
minValue=bin.intValue();
calculateMin(bin);
calculateMax(bin);
log.error("this value of bin is {} and the iteration is {}",bin , i);
decideBin(label,bin);
histoMap.put("0.0", histogram0);
histoMap.put("1.0", histogram1);
histoMap.put("@@@BASE@@@", histogramBase);
return histoMap;
}
});
}
public static JavaRDD<Map<String, HashMap<String, Integer>>> computeHistogram(RDD<LabeledPoint> Data, int i)
{
histogram0 = new java.util.HashMap<String, Integer>();
histogram1 = new java.util.HashMap<String, Integer>();
histogramBase = new java.util.HashMap<String, Integer>();
maxValue=0;
JavaRDD<Map<String, HashMap<String, Integer>>> jRdd = getJavaRDDMap(Data,i);
return jRdd;
}
}