1

Objective: I am trying to run a query on a parquet file [read from S3] and then write it out as a single tab-delimited text file in another S3 bucket. All of this is done in a spark app run on an EMR cluster on Amazon

I have read the other similar questions on StackOverflow but to no avail.

//Read parquet file
final Dataset<Row> df = spark.read().parquet(fileName)
//Register temp table for convinience 
df.registerTempTable(tableName)

//Valid SQL query string - Verified using data bricks on the same parquet file
final String queryString = //SQL String

//Result of running sql query on parquet file
final Dataset<Row> result = spark.sql(queryString)

//Check if result is null - result is NOT NULL

final JavaRDD<Row> rowJavaRDD = result.toJavaRDD()

//Check if rowJavaRDD is null - rowJavaRDD is NOT NULL

//Coalesce and write to a text file
rowJavaRDD.map(r -> r.mkString("\t")).coalesce(1).saveAsTextFile(savePath)

But I am getting a NPE on the line rowJavaRDD.map(r -> r.mkString("\t")).coalesce(1).saveAsTextFile(savePath)

I have tried using broadcast as well but that didn't do anything. Tried without the coalesce but still getting the same error.

I have checked the savePath is valid and there are no S3 permissions issue

I tried doing the same locally in a ./spark-shell using scala on the same parquet file and it worked fine :/

Running spark 2.0.2 on the EMR cluster. [Version 2.1 gives a classCastException on something else - so updating is an even bigger hassle]

Any help would be appreciated.

StackTrace is as follows:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 38.0 failed 4 times, most recent failure: Lost task
7.3 in stage 38.0 (TID 13586, ip-10-30-1-150.ec2.internal): java.lang.NullPointerException
        at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:231)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1203)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1348)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1219)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1161)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1064)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1030)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:956)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:955)
        at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1459)
        at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438)
        at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
        at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1438)
        at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:549)
        at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)

Related links

1) NPE saveAsTextFile

2) RDD as textfile

Community
  • 1
  • 1
Asad Awadia
  • 1,417
  • 2
  • 9
  • 15

2 Answers2

0

As mentioned in the first related link you provided yourself (NPE saveAsTextFile), the fact that this happens when you call saveAsTextFile() doesn't mean that the problem has anything to do with that line, since all of the transformations you did up to that point are lazily executed.

A quick Google search of "OnHeapColumnVector NullPointerException" turned up SPARK-16518 for me. Looks like this is an issue that has been a problem ever since 2.0.0 and is still unresolved.

Community
  • 1
  • 1
Jonathan Kelly
  • 1,940
  • 11
  • 14
  • I use that same command on other parquet files as well and it works fine. It's just on this file that I get the NPE. I downloaded the troublesome parquet file locally and tried the same command - and it worked fine :// – Asad Awadia Feb 07 '17 at 22:49
0

It looks like org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt hit a null column (due to a bug). Check if you find an int32 in the schema of your parquet file. See another version of this here: https://issues.apache.org/jira/browse/HIVE-14294

Also, share the schema of the parquet file.

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
xor007
  • 976
  • 2
  • 12
  • 21