3

I have a spark application which reads lines from a files and tries to deserialize them using jackson. To get this code to work, I needed to define the ObjectMapper inside the Map operation (otherwise I got a NullPointerException).

I have the following code which is working:

val alertsData = sc.textFile(rawlines).map(alertStr => {
      val mapper = new ObjectMapper()
      mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      mapper.registerModule(DefaultScalaModule)
      broadcastVar.value.readValue(alertStr, classOf[Alert])
    })

However, If I define the mapper outside the map and broadcast it, it fails with a NullPointerException.

This code fails:

val mapper = new ObjectMapper()
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.registerModule(DefaultScalaModule)
    val broadcastVar = sc.broadcast(mapper)

    val alertsData = sc.textFile(rawlines).map(alertStr => {
      broadcastVar.value.readValue(alertStr, classOf[Alert])
    })

What am I missing here?

Thanks, Aliza

dmitry
  • 4,989
  • 5
  • 48
  • 72
Aliza
  • 734
  • 1
  • 10
  • 25
  • I think broadcast is oriented on immutable data, and possibly your object was broadcasted as a shallow copy, without some state or dependencies used inside `ObjectMapper`. – dmitry Sep 10 '15 at 07:46

2 Answers2

7

It turns out you can broadcast the mapper. The problematic part was mapper.registerModule(DefaultScalaModule) which needs to be execute on each slave (executor) machine and not only on the driver.

so this code works:

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val broadcastVar = sc.broadcast(mapper)

val alertsData = sc.textFile(rawlines).map(alertStr => {
      broadcastVar.value.registerModule(DefaultScalaModule)
      broadcastVar.value.readValue(alertStr, classOf[Alert])
})

I further optimised the code by running registerModule only once per partition (and not for each element in the RDD).

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

val broadcastVar = sc.broadcast(mapper)
val alertsRawData = sc.textFile(rawlines)

val alertsData = alertsRawData.mapPartitions({ iter: Iterator[String] => broadcastVar.value.registerModule(DefaultScalaModule)
      for (i <- iter) yield broadcastVar.value.readValue(i, classOf[Alert]) })

Aliza

Aliza
  • 734
  • 1
  • 10
  • 25
  • Is there a way to do this on a per-partition basis with spark sql UDFs? – Bob Jan 10 '18 at 00:30
  • Additional tidbit: If you are trying to broadcast a `val` in main object and your main method has been inherited by the use of `... extends App`, then the broadcast fails. I found the solution here: https://stackoverflow.com/a/31323435/1628839 – Vivek Sethi Mar 05 '20 at 09:07
1

Indeed, objectMapper isn't really suitable for broadcast. It is inherently not serializable and not a value class anyway. I would suggest to broadcast DeserializationConfig instead and pass that to ObjectMapper's constructor from the braodcast variable in your map operation.

Erik Schmiegelow
  • 2,739
  • 1
  • 18
  • 22
  • FWIW, `ObjectMapper` is actually `java.io.Serializable`, and should work, but you are right in that this is rarely something you should do (there are limited cases where you may want to, which is why it is serializable, but that's worth a blog post). And specifically with Spark (or Storm/Trident etc) it belongs to a class of things one should just instantiate locally in my opinion. – StaxMan Sep 10 '15 at 17:58