-1

I am getting this exception Spark Version:2.0.2

    17/05/22 13:47:30 ERROR Executor: Exception in task 0.3 in stage 28.0 (TID 33)
java.lang.NullPointerException
    at com.baesystems.ai.analytics.util.RDDUtil.decideBin(RDDUtil.java:47)
    at com.baesystems.ai.analytics.util.RDDUtil.access$400(RDDUtil.java:19)
    at com.baesystems.ai.analytics.util.RDDUtil$1.call(RDDUtil.java:129)
    at com.baesystems.ai.analytics.util.RDDUtil$1.call(RDDUtil.java:102)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1028)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1765)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/05/22 13:47:30 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
17/05/22 13:47:30 INFO MemoryStore: MemoryStore cleared
ver commanded a shutdown

Please guide me as to what is going on here. This code works fine in my IDE(Intellj); This problem occurs only when i try to run this code on Spark Cluster in Standalone mode.


Here is the actual code:

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.rdd.RDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class RDDUtil implements Serializable
{

    /**
     * 
     */
    private static final long serialVersionUID = 1914105980520925932L;
    private static final Logger log = LoggerFactory.getLogger(RDDUtil.class);

    public static Map<String,  java.util.HashMap<String, Integer>> histoMap = new java.util.HashMap<String,  java.util.HashMap<String, Integer>>();
    private static HashMap<String, Integer> histogram0;
    private static java.util.HashMap<String, Integer> histogram1;
    private static java.util.HashMap<String, Integer> histogramBase;
    private static int minValue=0;
    private static int maxValue=0;

    public static int getMinValue() {
        return minValue;
    }

    public static int getMaxValue() {
        return maxValue;
    }

    private static void decideBin(Double label, Double bin) 
    {
       int vCount=0;
        log.error("this value of bin is {} and the label is {}",bin,label);
        histogramBase.put(bin.toString(), 0);
        if(label==0.0) {
            assignZero(histogram1, bin.toString());
            if(!checkIfPresent(histogram0,bin.toString())) {
                vCount++;
                histogram0.put(bin.toString(),vCount);
            }   
         }
         else {
             assignZero(histogram0, bin.toString());

            if(!checkIfPresent(histogram1,bin.toString())) {
                vCount++;
                histogram1.put(bin.toString(),vCount);
            }
         }
     }

    private static boolean checkIfPresent(java.util.HashMap<String, Integer> histogram, String bin) 
    {

        if(histogram.containsKey(bin)) {

            int value = histogram.get(bin);

            value++;
            histogram.put(bin, value);
            return true;    
         }
        else return false;
    }

    private static void assignZero(java.util.HashMap<String, Integer> histogram, String bin)  
    {
        if(!histogram.containsKey(bin)) {
            histogram.put(bin, 0);
        }
    }
    private static void calculateMin(Double bin)
    {
        int tempValue = bin.intValue();
        if(minValue>tempValue) minValue=tempValue;

    }
    private static void calculateMax(Double bin)
    {
        int tempValue = bin.intValue();
        if(tempValue>maxValue) maxValue=tempValue;

    }

    private static JavaRDD<Map<String, HashMap<String, Integer>>> getJavaRDDMap(RDD<LabeledPoint> rdd,int i)
    {
        long val = rdd.count();

        return rdd.toJavaRDD().map(new Function<LabeledPoint,Map<String, HashMap<String, Integer>>>() {


            private static final long serialVersionUID = -7619643561853615982L;

            @Override
            public Map<String, HashMap<String, Integer>> call(LabeledPoint p) {


                Double label = (Double)p.productElement(0);
                Vector v = (Vector)p.productElement(1);

                p.features();
                Double bin =0.0;
                int vSize =p.features().size();
                if(i<vSize) {
                    bin = v.apply(i);
                }
                else {
                    bin = v.apply(vSize-1);
                }

                minValue=bin.intValue();
                calculateMin(bin);
                calculateMax(bin);

                log.error("this value of bin is {} and the iteration is {}",bin , i);
                decideBin(label,bin);

                histoMap.put("0.0", histogram0);
                histoMap.put("1.0", histogram1);
                histoMap.put("@@@BASE@@@", histogramBase);

                return  histoMap;
            }
        });
    }



    public static JavaRDD<Map<String, HashMap<String, Integer>>> computeHistogram(RDD<LabeledPoint> Data, int i)
    {


        histogram0 = new java.util.HashMap<String, Integer>();
        histogram1 = new java.util.HashMap<String, Integer>();
        histogramBase = new java.util.HashMap<String, Integer>();
        maxValue=0;

        JavaRDD<Map<String, HashMap<String, Integer>>> jRdd = getJavaRDDMap(Data,i);
        return jRdd;
    }

}
Michael Myers
  • 188,989
  • 46
  • 291
  • 292
amina
  • 19
  • 1

1 Answers1

0

Quick suggestion: Try recompiling the jar before submitting to spark.

Can you provide more details as to how you're submitting the spark job?

Comparing the code you've provided vs the stack trace, it seems the line numbers don't line up. For example, RDDUtil line 19 is a comment string ("*/"), 129 is an empty string, and 102 is an "Override". Have you tried recompiling your jar before submitting the spark job? The mismatched line numbers and the fact that your code appears to work in IntelliJ could be because the code running in the Spark cluster is not the same as the code running on your own machine.

I'm not particularly familiar with SO, but I believe there's a way to edit your original post. That option is generally preferred if you're adding information to it (such as your code).

Jia Teoh
  • 1
  • 1