I am trying to do NLP in Apache Spark in spark-notebook. For this particular example, I am using the library https://opennlp.apache.org to create a chunker to extract noun phrases. Due to the increase in data volume, I need to shift to distributed computing.
The problem is that I am not able to broadcast my chunker object. From reading the docs (which only board cast simple objects like arrays) I tried the following:
import opennlp.tools.tokenize.WhitespaceTokenizer
import opennlp.tools.cmdline.postag.POSModelLoader
import opennlp.tools.postag.POSTaggerME
import opennlp.tools.chunker.ChunkerModel
import opennlp.tools.chunker.ChunkerME
import java.io.FileInputStream
import java.io.File
//Instantiate the ChunkerME class
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
val chunkerME = new ChunkerME(chunkerModel);
val broadCastedChunkerME = sc.broadcast(chunkerME)
But this is throwing the following error:
java.io.NotSerializableException: opennlp.tools.chunker.ChunkerME
Serialization stack:
- object not serializable (class: opennlp.tools.chunker.ChunkerME, value: opennlp.tools.chunker.ChunkerME@35a5c281)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:269)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411)
... 63 elided
What did work is if I wrap the initialisation of the chunker in the function and then call the function in the map method like the following:
def getNPChunks(sentence: String): Array[Chunk] = {
import opennlp.tools.chunker.ChunkerModel
import opennlp.tools.chunker.ChunkerME
import java.io.FileInputStream
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
//Instantiate the ChunkerME class
val chunkerME = new ChunkerME(chunkerModel);
chunkerME.chunkAsSpans(sentence);
}
// call the chunker
line.map(getNPChunks)
But here the problem is that this code is very inefficient because it is initializing a chunker object for every entry in the rdd. Because the map function is calling the getNPChunks function for every entry of the rdd, and for every entry, I am ending up creating a new chunker object.
Due to this inefficient design, my spark script is running 20 times slower than a sequential script.
What am I doing wrong?