0

I have a script that works fine up until this line:

df_3 = df_2.groupBy("id").pivot("key").agg(collect_list("value")).select('col1','col2')

The issue is a java.lang.NullPointerException caused by the pivot. I believe df_2 is around 600K rows and only a few columns, and if I add this line before, it works:

df_2 = df_2.limit(27000)

but any higher causes the null pointer exception. Why is this happening? I thought 600K rows wouldn't be that big of a data frame, but once I go above ~27K it seems to break.

Here is the code leading up to it:

parse_xml_udf = udf(parse_xml, ArrayType(MapType(StringType(),StringType())))
parsed_df = xml_df.withColumn('parsed_xml', parse_xml_udf(xml_df['xml_strs']))

df_1 = parsed_df.withColumn('exploded_arr',explode('parsed_xml')) 
df_2 = df_1.select(explode('exploded_arr'),*df_1.columns)

Full stacktrace:

20/07/07 10:46:08 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, _, executor 2): java.lang.NullPointerException
        at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.execution.GenerateExec$$anonfun$doExecute$1$$anonfun$apply$9.apply(GenerateExec.scala:111)
        at org.apache.spark.sql.execution.GenerateExec$$anonfun$doExecute$1$$anonfun$apply$9.apply(GenerateExec.scala:109)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

20/07/07 10:46:08 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 1.0 (TID 4, _, executor 2, partition 0, PROCESS_LOCAL, 19119331 bytes)
20/07/07 10:46:11 INFO scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 3) on _, executor 1: java.lang.NullPointerException (null) [duplicate 1]
20/07/07 10:46:11 INFO scheduler.TaskSetManager: Starting task 1.1 in stage 1.0 (TID 5, _,executor 1, partition 1, PROCESS_LOCAL, 19064182 bytes)
20/07/07 10:46:12 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 1.0 (TID 4) on _, executor 2: java.lang.NullPointerException (null) [duplicate 2]
20/07/07 10:46:12 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 1.0 (TID 6, _, executor 2, partition 0, PROCESS_LOCAL, 19119331 bytes)
20/07/07 10:46:15 INFO scheduler.TaskSetManager: Lost task 1.1 in stage 1.0 (TID 5) on _, executor 1: java.lang.NullPointerException (null) [duplicate 3]
20/07/07 10:46:15 INFO scheduler.TaskSetManager: Starting task 1.2 in stage 1.0 (TID 7, _, executor 1, partition 1, PROCESS_LOCAL, 19064182 bytes)
20/07/07 10:46:15 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 1.0 (TID 6) on _, executor 2: java.lang.NullPointerException (null) [duplicate 4]
20/07/07 10:46:15 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 1.0 (TID 8, _, executor 2, partition 0, PROCESS_LOCAL, 19119331 bytes)
20/07/07 10:46:17 INFO scheduler.TaskSetManager: Lost task 1.2 in stage 1.0 (TID 7) on _, executor 1: java.lang.NullPointerException (null) [duplicate 5]
20/07/07 10:46:18 INFO scheduler.TaskSetManager: Starting task 1.3 in stage 1.0 (TID 9, _, executor 1, partition 1, PROCESS_LOCAL, 19064182 bytes)
20/07/07 10:46:18 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 1.0 (TID 8) on _, executor 2: java.lang.NullPointerException (null) [duplicate 6]
20/07/07 10:46:18 ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job
20/07/07 10:46:18 INFO cluster.YarnScheduler: Cancelling stage 1
20/07/07 10:46:18 INFO cluster.YarnScheduler: Stage 1 was cancelled
20/07/07 10:46:18 INFO scheduler.DAGScheduler: ShuffleMapStage 1 (pivot at NativeMethodAccessorImpl.java:0) failed in 16.373 s due to Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, _, executor 2): java.lang.NullPointerException

Driver stacktrace:
20/07/07 10:46:18 INFO scheduler.DAGScheduler: Job 1 failed: pivot at NativeMethodAccessorImpl.java:0, took 16.448475 s
20/07/07 10:46:18 WARN spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
Traceback (most recent call last):
  File "_", line 74, in <module>
    df_3 = df_2.groupBy("id").pivot("key").agg(collect_list("value")).select('col1','col2')
  File "_/lib/spark2/python/lib/pyspark.zip/pyspark/sql/group.py", line 192, in pivot
  File "_/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "_/lib/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "_/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o94.pivot.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, _, executor 2): java.lang.NullPointerException

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
        at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:266)
        at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:128)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange$.prepareShuffleDependency(ShuffleExchange.scala:218)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:84)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:121)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:112)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:112)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:235)
        at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:368)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
        at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:90)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
        at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2547)
        at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2544)
        at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:321)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
formicaman
  • 1,317
  • 3
  • 16
  • 32

1 Answers1

0

If you see a NullPointerException thrown by the worker when it tries to access a SparkContext object that's only present on the driver and not the workers.

In the second case, my hunch is the job was run locally on the driver and worked purely by accident. More read : here and here

dsk
  • 1,863
  • 2
  • 10
  • 13