We're trying to submit a spark job (spark 2.0, hadoop 2.7.2) but for some reason we're receiving a rather cryptic NPE in EMR. Everything runs just fine as a scala program so we're not really sure what's causing the issue. Here's the stack trace:
18:02:55,271 ERROR Utils:91 - Aborting task java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
As far as we can tell this is occurring in the following method:
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.map(row =>
"text|label"
).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
We've narrowed it down to the map function as this works when submitted as a spark job:
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
Does anyone have any idea what might be causing this issue? Also, how can we resolve it? We're pretty stumped.