0

I'm writing a Spark job that downloads files from S3 that contains json lines, turns those into a DataSet and writes them to a Parquet file.

javaSparkContext.parallelize(files)
            .foreach((VoidFunction<String>) file -> {
                try {
                    List<String> jsons = Lists.newArrayList();
                    ... 
                    Dataset<String> eventsDS = spark.createDataset(jsons);
                    Dataset<Row> eventsDF = spark.read().json(eventsDS);
                    eventsDF.write().parquet("parquet/")
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            });

The error at runtime when running on my laptop

java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:128)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170)
    at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61)
    at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:457)
    at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:494)
    at com.amazon.mobiletargeting.ParquetExporter.lambda$export$faf5744a$1(ParquetExporter.java:166)

My question was down voted under the pretext that its a duplicate of Why does this Spark code make NullPointerException?

I updated the comment below saying that my Spark dataSets aren't being created in the context of UDFs but inside a forEach on an RDD.

Any advise is highly appreciated! Ranjit

Ranjit Iyer
  • 857
  • 1
  • 11
  • 20
  • 1
    Not seeing how they are related. The post you linked talks about not using DataFrames inside a UDF and I don't define a UDF - just a basic forEach on an RDD. Unless I missed a key concept that relates UDF and the 'forEach', I think my question is unrelated. – Ranjit Iyer Jan 28 '18 at 16:28
  • Yeah not sure why this was marked as duplicate. Did you ever figure out what the issue was? – Michael Feb 19 '18 at 23:52
  • for anyone else landing here looking for an answer. this is how I finally got it to work after pulling various things from the examples in the example code distributed with the spark download "examples/src/main/java/org/apache/spark/examples/sql/" messages.foreachRDD(rdd -> { SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); List jsonData = rdd.map(row -> row.value()).collect(); //do your thing } – Michael Feb 21 '18 at 04:49

0 Answers0