4

We're trying to submit a spark job (spark 2.0, hadoop 2.7.2) but for some reason we're receiving a rather cryptic NPE in EMR. Everything runs just fine as a scala program so we're not really sure what's causing the issue. Here's the stack trace:

18:02:55,271 ERROR Utils:91 - Aborting task java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

As far as we can tell this is occurring in the following method:

def process(dataFrame: DataFrame, S3bucket: String) = {
  dataFrame.map(row =>
      "text|label"
  ).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}

We've narrowed it down to the map function as this works when submitted as a spark job:

def process(dataFrame: DataFrame, S3bucket: String) = {
  dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}

Does anyone have any idea what might be causing this issue? Also, how can we resolve it? We're pretty stumped.

Community
  • 1
  • 1
cscan
  • 3,684
  • 9
  • 45
  • 83

1 Answers1

8

I think that you get a NullPointerException thrown by the worker when it tries to access a SparkContext object that's only present on the driver and not the workers.

coalesce() repartitions your data. When you request one partition only, it will try to squeeze all the data in one partition*. That may put much pressure on the memory footpring of your application.

In general, it is a good idea not to shrink your partitions in only 1.

For more, read this: Spark NullPointerException with saveAsTextFile and this.


i.n.n.m
  • 2,936
  • 7
  • 27
  • 51
gsamaras
  • 71,951
  • 46
  • 188
  • 305
  • The reason why we were using coalesce(1) was to write all of the data into a single file instead of many files. Is there any other way to accomplish this? – cscan Aug 17 '16 at 01:55
  • @cscan no. Maybe increasing your memory settings can allow your application to work with 1 partition, but I the error you posted doesn't indicate something like that. Is there a reason why you want them to be in 1 file? – gsamaras Aug 17 '16 at 01:56
  • 2
    This error occurred when we were testing with only five records - I don't think it was related to memory use. – cscan Aug 17 '16 at 02:04
  • 1
    Me too @cscan! I also updated my answer. BTW, if you **really want to have only one file**, then my tip would be to run the job, let the file be split into parts and afterwards, merge the parts to a single file. But in general, having a single file means not having a lot of data, so maybe Spark may not be needed after all.. – gsamaras Aug 17 '16 at 02:06