0

I'm running a simple Spark project on a EMR YARN cluster to:

  • read a textfile on S3 into an RDD[String]
  • define a schema and convert that RDD into a DF

I am doing a mapPartition on the RDD to convert that RDD[String] into an RDD[Row]. My problem - I get a java.Lang.NullPointerException and I can't figure out what the problem is.

The stacktrace lists these 2 line numbers in the source code -

  • the line of rdd1.mapPartition
  • within the anonymous function, the line with the match case that matches the regular

Here's the stacktrace excerpt -

Caused by: java.lang.NullPointerException
    at packageA.Herewego$$anonfun$3.apply(Herewego.scala:107)
    at packageA.Herewego$$anonfun$3.apply(Herewego.scala:88)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

I've tried -

  • The error occurs when running in YARN cluster mode - and not in Local mode (in my IDE). This made me think that something isn't defined on the Executor? I moved the createrow function def into the anonymous function def - it didn't work though.

Here's the code block

val rdd4: RDD[Row] = rdd1.mapPartitions((it:Iterator[String]) => {

    def createrow(a: List[String]): Row = {

      val format = new java.text.SimpleDateFormat("dd/MMM/yyyy HH:mm:ss Z")

      val re1: Row = Row.apply(a.head)

      val d: Date = format.parse(a.tail.mkString(" "))
      val t = new Timestamp(d.getTime)
      val re2: Row = Row.apply(t)

      Row.merge(re1, re2)
    }

    var output: List[Row] = List()
    while (it.hasNext) {
      val data: String = it.next()
      val res = data match {
        case rx(ipadd, date, time) => createrow(List(ipadd, date, time))
        case _ => createrow(List("0.0.0.0", "00/Jan/0000", "00:00:00 0"))
      }
      output = output :+ res
    }
    output.toIterator
  }).persist(MEMORY_ONLY)

// Collect and Persist the RDD in Memory
  val tmp = rdd4.collect()

Do I need to broadcast any variables or functions used within the mapPartition? Any pointers in the right direction will be more than appreciated.

Kevin Lawrence
  • 62
  • 1
  • 10
  • 1
    Which line is 107? – ollik1 May 27 '19 at 05:52
  • Possible duplicate of [What is a NullPointerException, and how do I fix it?](https://stackoverflow.com/questions/218384/what-is-a-nullpointerexception-and-how-do-i-fix-it) – user10938362 May 27 '19 at 06:05
  • @ollik1 Line #107 is this line - `case rx(ipadd, date, time) => createrow(List(ipadd, date, time))` – Kevin Lawrence May 27 '19 at 07:19
  • For now, I've got the mapPartition to work by doing the following - 1. Define `createrow` within the anonymous function. 2. Define the regex `rx` also within the anonymous function. I'm now researching if I need to declare these as Broadcast variables, or use another construct. – Kevin Lawrence May 27 '19 at 07:22
  • 1
    This SO post helped me realise the issue -https://stackoverflow.com/a/34912887/7538823. I've moved the `rx` variable within the mapPartition function literal and it works now. – Kevin Lawrence May 27 '19 at 09:18

0 Answers0