1

I am trying to Process a query on Presto using JDBC and pass the resultset back to Spark for creating a temp table on it. My result set is in List

I am getting query in form of json Msg from kafka producer. so have created kafka consumer in spark to get the message and do further processing.

Below is my main function :

public static void main(String[] args) throws InterruptedException {

    SparkConf conf = new SparkConf();
     conf.setAppName("Wordcount Background");
     conf.setMaster("local");


    //SparkContext sc = SparkContext.getOrCreate(conf);
     SparkSession spark = 
SparkSession.builder().config(conf).getOrCreate();

     JavaSparkContext sc = new 
JavaSparkContext(spark.sparkContext());
     JavaStreamingContext ssc = new JavaStreamingContext(sc, 
Durations.seconds(5));
     SQLContext sqc = new SQLContext(sc);

     Set<String> topics = Collections.singleton("TestTopic");
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", "172.20.3.189:9092");

        JavaPairInputDStream<String, String> directKafkaStream = 
KafkaUtils.createDirectStream(ssc,
                String.class, String.class, StringDecoder.class, 
StringDecoder.class, kafkaParams, topics);

        directKafkaStream.foreachRDD(rdd -> {
            //System.out.println("--- New RDD with " + 
rdd.partitions().size()
                  //  + " partitions and " + rdd.count() + " 
records");
            rdd.foreach(record -> {

                SparkkafkaJson sk = new SparkkafkaJson();

                Dataset<String> dfrdd = 
spark.createDataset(sk.process_query(record._2), Encoders.STRING());
                System.out.print(dfrdd);
                //Dataset<Row> df = spark.read().json(dfrdd);
                //df.show();

            });
        });

        ssc.start();
        ssc.awaitTermination();

}

Following is the process_query Method which returns the resultset to the main function :

public List<String> process_query(String queryjson) {
    String resstr="";
    String columnValue="";
    List<String> jsonList = new ArrayList<>();
    //List<String> list=new ArrayList<String>();
    try {

        Class.forName(JDBC_DRIVER);
        //Open a connection
        conn = DriverManager.getConnection(DB_URL, USER, PASS);
        //Execute a query
        stmt = conn.createStatement();
        String sql = process_json(queryjson);
        ResultSet res = stmt.executeQuery(sql);
        ResultSetMetaData rsmd = res.getMetaData();
        int columnsNumber = rsmd.getColumnCount();
        //Extract data from result set
        while (res.next()) {
          //System.out.println(res.getString(""));
            Gson userGson=new GsonBuilder().create();
            JsonObject params = new JsonObject();
            for (int i = 1; i <= columnsNumber; i++) {

                String ColName = rsmd.getColumnName(i);
                String ColVal = res.getString(i);


                params.addProperty(ColName, ColVal);

            }

            resstr = userGson.toJson(params);
            jsonList.add(resstr);
        }


        //Clean-up environment
        res.close();
        stmt.close();
        conn.close();
      } catch (SQLException se) {
        //Handle errors for JDBC
        se.printStackTrace();
      } catch (Exception e) {
        //Handle errors for Class.forName
        e.printStackTrace();
      } finally {
        //finally block used to close resources
        try {
          if (stmt != null) stmt.close();
        } catch (SQLException sqlException) {
          sqlException.printStackTrace();
        }
        try {
          if (conn != null) conn.close();
        } catch (SQLException se) {
          se.printStackTrace();
        }
      }
    return jsonList;
}

But still I get this error output

    2019-05-30 13:17:41 INFO  ContextCleaner:54 - Cleaned accumulator 42
    2019-05-30 13:17:41 INFO  ContextCleaner:54 - Cleaned accumulator 109
    2019-05-30 13:17:43 INFO  CodeGenerator:54 - Code generated in 216.222798 
    ms
    2019-05-30 13:17:43 ERROR Executor:91 - Exception in task 1.0 in stage 9.0 
    (TID 19)
    java.lang.NullPointerException
        at 
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:183)
        at org.apache.spark.sql.Dataset$.apply(Dataset.scala:65)
        at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:474)
        at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:511)
        at SparkkafkaJson.SparkkafkaJson.lambda$1(SparkkafkaJson.java:213)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:927)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2019-05-30 13:17:43 WARN  TaskSetManager:66 - Lost task 1.0 in stage 9.0 (TID 19, localhost, executor driver): java.lang.NullPointerException
        at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:183)
        at org.apache.spark.sql.Dataset$.apply(Dataset.scala:65)
        at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:474)
        at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:511)
        at SparkkafkaJson.SparkkafkaJson.lambda$1(SparkkafkaJson.java:213)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)

Please help

  • Possible duplicate of [What is a NullPointerException, and how do I fix it?](https://stackoverflow.com/questions/218384/what-is-a-nullpointerexception-and-how-do-i-fix-it) – Samuel Philipp May 30 '19 at 13:42
  • 1
    Nope my Question is different as it is specific to the Spark Java stream and not a normal java code .. its more logical issue than syntax related – Niheel Thakkar May 30 '19 at 13:54

1 Answers1

1

Fixed the above issue by logically as below:

    directKafkaStream.foreachRDD(rdd -> {
            //System.out.println("--- New RDD with " + rdd.partitions().size()
                  //  + " partitions and " + rdd.count() + " records");     
            rdd.collect().forEach(record -> {
                List<String> jsonList2 = new ArrayList<>();
                //System.out.print(sk.process_query(record._2,sk,spark));
                jsonList2 = sk.process_query(record._2,sk,spark);

                 if(jsonList2.size() > 0) {
                     //System.out.print("came here");
                     sk.jsonList3 = jsonList2; 
                     String JsonStr = jsonList2.toString();
                     //System.out.print(JsonStr);
                     System.out.print("Sending Data to API");
                     System.out.print(" ");
                     try {
                        sk.Send_query_data_SCDF(JsonStr);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                     }else {                 
                         System.out.print("came into else");
                         sk.jsonList3= new ArrayList<String>();
                     }


            });

I was trying to create a Dataset under foreach of RDD which is why is was giving nullpointer Exception as Dataset creation needs to be done on the drivers end and not the executors end. So Collected the RDD result and used it oustside foreach to make it persistent

            Dataset<String> dfrdd = spark.createDataset(sk.jsonList3,           
            Encoders.STRING());

            Dataset<Row> wordsDataFrame = spark.read().json(dfrdd);             

            wordsDataFrame.createOrReplaceTempView("words");


              Dataset<Row> wordCountsDataFrame =
                spark.sql("select * from words limit 10");
              wordCountsDataFrame.show();
              String jsonToReturn = 
              wordCountsDataFrame.toJSON().collectAsList().toString();
              System.out.print(jsonToReturn);


              sk.jsonList3= new ArrayList<String>();
              rdd.unpersist();

        });