2

I am trying to do NLP in Apache Spark in spark-notebook. For this particular example, I am using the library https://opennlp.apache.org to create a chunker to extract noun phrases. Due to the increase in data volume, I need to shift to distributed computing.

The problem is that I am not able to broadcast my chunker object. From reading the docs (which only board cast simple objects like arrays) I tried the following:

import opennlp.tools.tokenize.WhitespaceTokenizer
import opennlp.tools.cmdline.postag.POSModelLoader
import opennlp.tools.postag.POSTaggerME
import opennlp.tools.chunker.ChunkerModel
import opennlp.tools.chunker.ChunkerME
import java.io.FileInputStream
import java.io.File

//Instantiate the ChunkerME class 
val inputStream = new FileInputStream("fr-chunk.bin"); 
val chunkerModel = new ChunkerModel(inputStream);
val chunkerME = new ChunkerME(chunkerModel); 

val broadCastedChunkerME = sc.broadcast(chunkerME)

But this is throwing the following error:

java.io.NotSerializableException: opennlp.tools.chunker.ChunkerME
Serialization stack:
    - object not serializable (class: opennlp.tools.chunker.ChunkerME, value: opennlp.tools.chunker.ChunkerME@35a5c281)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268)
  at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
  at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:269)
  at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
  at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
  at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
  at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
  at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411)
  ... 63 elided

What did work is if I wrap the initialisation of the chunker in the function and then call the function in the map method like the following:

def getNPChunks(sentence: String): Array[Chunk] = {
  import opennlp.tools.chunker.ChunkerModel
  import opennlp.tools.chunker.ChunkerME
  import java.io.FileInputStream

  val inputStream = new FileInputStream("fr-chunk.bin"); 
  val chunkerModel = new ChunkerModel(inputStream);

  //Instantiate the ChunkerME class 
  val chunkerME = new ChunkerME(chunkerModel); 

  chunkerME.chunkAsSpans(sentence); 
}

// call the chunker 
line.map(getNPChunks)

But here the problem is that this code is very inefficient because it is initializing a chunker object for every entry in the rdd. Because the map function is calling the getNPChunks function for every entry of the rdd, and for every entry, I am ending up creating a new chunker object.

Due to this inefficient design, my spark script is running 20 times slower than a sequential script.

What am I doing wrong?

himanshuIIITian
  • 5,985
  • 6
  • 50
  • 70
Codious-JR
  • 1,658
  • 3
  • 26
  • 48
  • Have you written the "chunkerME" initialization inside scala class or scala object? If you have written within scala class then try writing within scala object and then broadcast it. Or try to make the scala class Serializable by implementing the Serializable interface. – Amit Kumar Nov 19 '17 at 00:40
  • @AmitKumar that is interesting, currently I am using spark-notebook so I the chunker must be initialised outside the "object" (i think the same concepts don't apply when in notebooks). But I actually found this code from an example, and there the person had initialised the chunker in the object and broadcasted it. Maybe thats why it worked. I will definitely try this when using spark-submit. You can post your suggestion as an answer and when I try spark-submit I will set it as true if this works – Codious-JR Nov 19 '17 at 10:47

2 Answers2

2

the way to solve the problem is to use mapPartitions.
that way you can create one chunker per partition instead of one per row:

def getChunker():
  val inputStream = new FileInputStream("fr-chunk.bin"); 
  val chunkerModel = new ChunkerModel(inputStream);

  //Instantiate the ChunkerME class 
  val chunkerME = new ChunkerME(chunkerModel); 

line.mapPartitions(it =>
   val chunker = getChunker()
   it.map(line => chunker.chunkAsSpans(line))
)

see this answer for more details about mapPartitions: https://stackoverflow.com/a/39203798/245024

lev
  • 3,986
  • 4
  • 33
  • 46
  • this is already a great solution, because this way I still get to parallelise the work and not blow up the memory which doing so. But a question, is the mapPartitions less or equal in performance compared to a map. Or does the map at the end of the day use mapPartitions? – Codious-JR Nov 19 '17 at 10:48
  • map is using mapPartitions internally. you can see that in the spark code: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L372 – lev Nov 19 '17 at 10:54
  • I am still facing the problem when using the POSTagger `val posStream1 = new FileInputStream(new java.io.File("/u01/fusion/3.1.3/data/nlp/models/en-pos-maxent.bin"))` `val posModel1 = new POSModel(posStream1); val tagger = new POSTaggerME(posModel); ` – Aman Tandon Sep 13 '18 at 12:49
0

Initialize the "chunkerME" inside scala object and then broadcast it.The Scala Object is by default serialized. The compiler does the Serialization of the scala object.

Or if initializing within scala class, the scala class needs to be Serialized explicitly by extending the Serializable trait.

Amit Kumar
  • 1,544
  • 11
  • 23