0

I am trying to save dataframe contents to hdfs in csv format. I am able to do it with small no.of files. while trying to do with more number of files ( 90+ files) am getting NullPointerException and job fails. below is my code:

val df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "false").option("delimiter", "|").load(hdfs path for loading multiple files/*");

val mydateFunc = udf {(x: String) => x.split("/") match {case Array(month,date,year) => year+"-"+month+"-"+date case Array(y)=> y}}

val df2 = df1.withColumn("orderdate", mydateFunc(df1("Date on which the record was created"))).drop("Date on which the record was created")

val df3 = df2.withColumn("deliverydate", mydateFunc(df2("Requested delivery date"))).drop("Requested delivery date")

val exp = "(.*)(44000\\d{5}|69499\\d{6})(.*)".r


val upc_extractor: (String => String) = (arg: String) => arg match { case exp(pref,required,suffx) => required case x:String => x }

val sqlfunc = udf(upc_extractor)

val df4 = df3.withColumn("formatted_UPC", sqlfunc(col("European Article Numbers/Universal Produ")))

df4.write.format("com.databricks.spark.csv").option("header", "false").save("destination path in hdfs to save the resultant files");

the below is the exception i am getting:

16/02/03 01:59:15 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/02/03 01:59:33 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 3)
java.lang.NullPointerException
        at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:30)
        at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:30)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:71)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
        at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$9$$anon$1.next(package.scala:165)
        at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$9$$anon$1.next(package.scala:158)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
16/02/03 01:59:33 INFO TaskSetManager: Starting task 32.0 in stage 1.0 (TID 33, localhost, ANY, 1692 bytes)
16/02/03 01:59:33 INFO Executor: Running task 32.0 in stage 1.0 (TID 33)
16/02/03 01:59:33 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 3, localhost): java.lang.NullPointerException
        at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:30)
        at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:30)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:71)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
        at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$9$$anon$1.next(package.scala:165)
        at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$9$$anon$1.next(package.scala:158)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

16/02/03 01:59:33 ERROR TaskSetManager: Task 2 in stage 1.0 failed 1 times; aborting job
16/02/03 01:59:33 INFO TaskSchedulerImpl: Cancelling stage 1
16/02/03 01:59:33 INFO Executor: Executor is trying to kill task 29.0 in stage 1.0 (TID 30)
16/02/03 01:59:33 INFO Executor: Executor is trying to kill task 8.0 in stage 1.0 (TID 9)
16/02/03 01:59:33 INFO TaskSchedulerImpl: Stage 1 was cancelled
16/02/03 01:59:33 INFO Executor: Executor is trying to kill task 0.0 in stage 1.0 (TID 1)

Spark version is 1.4.1. Any help is much more appreciated.

Mahesh
  • 178
  • 3
  • 14
  • Possible duplicate of [What is a Null Pointer Exception, and how do I fix it?](http://stackoverflow.com/questions/218384/what-is-a-null-pointer-exception-and-how-do-i-fix-it) – Gavriel Feb 03 '16 at 09:26
  • Hi Gavriel ! i edited my question as i m getting exception in spark SQL while saving multiple files to hdfs. I am wondering as the above code works fine with less no.of files. – Mahesh Feb 03 '16 at 09:39
  • Try to see if it's really the number of the files, or maybe it's file number X that is causing the problem. In other words if you have file001,...file100, and it works with file001..file89, and it doesn't work with file001..file090, then maybe it doesn't work on file090 alone, and if so then the problem is in that file – Gavriel Feb 03 '16 at 16:22
  • yes @Gavriel !! you are right. I found that among all the files, i got this exception for only 3 files.That too at udf "mydateFun" in the above code. – Mahesh Feb 03 '16 at 17:25

2 Answers2

1

Probably one of your files have wrong input in it. The 1st thing to do is to find the file. After you found it you can try to find the line that causes the problem. When you got the line, have a close look at it, and probably you'll see the problem. My guess is that the number of columns doesn't match the expectations. Maybe something is not escaped correctly. If you don't find it, you can still update the question by adding the content of the file.

Gavriel
  • 18,880
  • 12
  • 68
  • 105
  • @ Gavriel ! i want to handle null values with Option[T] type to handle this Null Pointer Exception. But i am unable to use split("/") function on Option[String] type values. please suggest me a way to do apply string functions on Option[String] type values. – Mahesh Feb 04 '16 at 11:55
  • any String can be null or not null. Everywhere in your code you'll need to check if it's null and do the right action accordingly – Gavriel Feb 04 '16 at 12:26
0

after adding if condition to udf mydateFunc to filter null values which causing NPE, the code is working fine. And i am able to load all the files.

val mydateFunc = udf {(x: String) => if(x ==null) x else x.split("/") match {case Array(month,date,year) => year+"-"+month+"-"+date case Array(y)=> y}}
Mahesh
  • 178
  • 3
  • 14