I am trying to save dataframe contents to hdfs in csv format. I am able to do it with small no.of files. while trying to do with more number of files ( 90+ files) am getting NullPointerException and job fails. below is my code:
val df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "false").option("delimiter", "|").load(hdfs path for loading multiple files/*");
val mydateFunc = udf {(x: String) => x.split("/") match {case Array(month,date,year) => year+"-"+month+"-"+date case Array(y)=> y}}
val df2 = df1.withColumn("orderdate", mydateFunc(df1("Date on which the record was created"))).drop("Date on which the record was created")
val df3 = df2.withColumn("deliverydate", mydateFunc(df2("Requested delivery date"))).drop("Requested delivery date")
val exp = "(.*)(44000\\d{5}|69499\\d{6})(.*)".r
val upc_extractor: (String => String) = (arg: String) => arg match { case exp(pref,required,suffx) => required case x:String => x }
val sqlfunc = udf(upc_extractor)
val df4 = df3.withColumn("formatted_UPC", sqlfunc(col("European Article Numbers/Universal Produ")))
df4.write.format("com.databricks.spark.csv").option("header", "false").save("destination path in hdfs to save the resultant files");
the below is the exception i am getting:
16/02/03 01:59:15 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/02/03 01:59:33 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 3)
java.lang.NullPointerException
at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:30)
at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:30)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:71)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$9$$anon$1.next(package.scala:165)
at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$9$$anon$1.next(package.scala:158)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/02/03 01:59:33 INFO TaskSetManager: Starting task 32.0 in stage 1.0 (TID 33, localhost, ANY, 1692 bytes)
16/02/03 01:59:33 INFO Executor: Running task 32.0 in stage 1.0 (TID 33)
16/02/03 01:59:33 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 3, localhost): java.lang.NullPointerException
at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:30)
at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:30)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:71)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$9$$anon$1.next(package.scala:165)
at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$9$$anon$1.next(package.scala:158)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/02/03 01:59:33 ERROR TaskSetManager: Task 2 in stage 1.0 failed 1 times; aborting job
16/02/03 01:59:33 INFO TaskSchedulerImpl: Cancelling stage 1
16/02/03 01:59:33 INFO Executor: Executor is trying to kill task 29.0 in stage 1.0 (TID 30)
16/02/03 01:59:33 INFO Executor: Executor is trying to kill task 8.0 in stage 1.0 (TID 9)
16/02/03 01:59:33 INFO TaskSchedulerImpl: Stage 1 was cancelled
16/02/03 01:59:33 INFO Executor: Executor is trying to kill task 0.0 in stage 1.0 (TID 1)
Spark version is 1.4.1. Any help is much more appreciated.