0

I'm doing some operations between two DataFrames:

val resultRdd = df1.join(df2, df1("column")===df2("column") &&
    df1("column2").contains(df2("column")), "left_outer").rdd

resultRdd.map { t => ... }

But I get this error every time: edited*

Job aborted due to stage failure: Task 114 in stage 40.0 failed 4 times, most recent failure: Lost task 114.3 in stage 40.0 (TID 908, 10.10.10.51, executor 1): java.lang.NullPointerException
    at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Because of this error, I try to print

I'm researching and I read in others questions that it could be possible that the executors cannot access to the DataFrame instead of the driver. NullPointerException in Spark RDD map when submitted as a spark job

I've tried both coalesce() and collect() but doesn't work for me.

I don't know how to deal with this issue, any help?

I'm using Spark 2.1.0

edited2***************************

After debugging I think I found some clues:

I have my application running in a Server and if I execute once It works, but if I execute again It fails. If I restart the server and SparkContext is created as new It works again.

The log error is:

17/01/25 16:10:40 INFO TaskSetManager: Starting task 86.0 in stage 126.0 (TID 5249, localhost, executor driver, partition 86, ANY, 6678 bytes)
17/01/25 16:10:40 INFO TaskSetManager: Finished task 43.0 in stage 126.0 (TID 5248) in 6 ms on localhost (executor driver) (197/200)
17/01/25 16:10:40 INFO Executor: Running task 86.0 in stage 126.0 (TID 5249)
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Getting 96 non-empty blocks out of 200 blocks
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 4 blocks
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/25 16:10:40 ERROR Executor: Exception in task 86.0 in stage 126.0 (TID 5249)
java.lang.NullPointerException
    at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$class.foreach(Iterator.scala:750)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:295)
    at scala.collection.AbstractIterator.to(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:287)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:274)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1202)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/01/25 16:10:40 INFO TaskSetManager: Starting task 114.0 in stage 126.0 (TID 5250, localhost, executor driver, partition 114, ANY, 6678 bytes)
17/01/25 16:10:40 INFO Executor: Running task 114.0 in stage 126.0 (TID 5250)
17/01/25 16:10:40 WARN TaskSetManager: Lost task 86.0 in stage 126.0 (TID 5249, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$class.foreach(Iterator.scala:750)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:295)
    at scala.collection.AbstractIterator.to(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:287)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:274)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1202)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 4 blocks
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/25 16:10:40 ERROR TaskSetManager: Task 86 in stage 126.0 failed 1 times; aborting job
17/01/25 16:10:40 INFO Executor: Finished task 114.0 in stage 126.0 (TID 5250). 3800 bytes result sent to driver
17/01/25 16:10:40 INFO TaskSetManager: Finished task 114.0 in stage 126.0 (TID 5250) in 15 ms on localhost (executor driver) (198/200)
17/01/25 16:10:40 INFO TaskSchedulerImpl: Removed TaskSet 126.0, whose tasks have all completed, from pool 
17/01/25 16:10:40 INFO TaskSchedulerImpl: Cancelling stage 126
17/01/25 16:10:40 INFO DAGScheduler: ResultStage 126 (collect at CategorizationSystem.scala:123) failed in 1.664 s due to Job aborted due to stage failure: Task 86 in stage 126.0 failed 1 times, most recent failure: Lost task 86.0 in stage 126.0 (TID 5249, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$class.foreach(Iterator.scala:750)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:295)
    at scala.collection.AbstractIterator.to(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:287)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:274)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1202)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
17/01/25 16:10:40 INFO DAGScheduler: Job 19 failed: collect at CategorizationSystem.scala:123, took 8.517397 s
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116937
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116938
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116939
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116940
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116941
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116942
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116943
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116944
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116945
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 16
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117330
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117331
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117332
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117333
17/01/25 16:10:41 INFO BlockManagerInfo: Removed broadcast_36_piece0 on 192.168.80.136:35004 in memory (size: 20.6 KB, free: 613.8 MB)
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117334
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117335
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117336
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117337
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117338
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117339
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117340
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 17
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117341
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117342
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117343
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117344
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117345
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 18
17/01/25 16:10:41 INFO BlockManagerInfo: Removed broadcast_39_piece0 on 192.168.80.136:35004 in memory (size: 23.0 KB, free: 613.8 MB)
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102054
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102055
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102056
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102057
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102058
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102059
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102060
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102061
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 11
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102062
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102063
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102064
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102065
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102066
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 12
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 15

Any help?

Community
  • 1
  • 1
Manuel Valero
  • 445
  • 1
  • 5
  • 18
  • Try `df1.show`, `df2.show` and `resultRdd.show` in order to get some more details about your case – FaigB Jan 20 '17 at 12:52
  • `NullPointerException` will come when you do operation on null value. need complete stack trace & better code snippet to address where exactly you are getting NPE – Ram Ghadiyaram Jan 20 '17 at 18:03
  • 1
    The result of df1.show and df2.show does not have any null value as I recently checked. To print resultrdd I have to resultRdd.collect.foreach(println(_)) and I get the same error that I edit in the question as edited* – Manuel Valero Jan 23 '17 at 09:16
  • 1
    Could be data type error. Update the output of df1.printSchema() & df2.printSchema() – KiranM Jan 24 '17 at 04:25
  • 1
    I print both schemas and I find that the df1 "column2" value was nullable true and and df2 "column2" not. It is still failing. – Manuel Valero Jan 24 '17 at 15:36
  • After debugging I edit the question with some clues. Edited part is after: "edited2***************************" – Manuel Valero Jan 25 '17 at 15:16
  • After debugging I find the solution. There is somewhere in the code an df1.cache. This cause, after the first succesful execution, a npe error. I don't know why removing the .cache it always work. – Manuel Valero Jan 27 '17 at 12:57

1 Answers1

4

I solved the problem removing a df1.cache that I had in some part of the code. I don't know why this solve the problem but it works.

Manuel Valero
  • 445
  • 1
  • 5
  • 18