1

I am trying to extract names from texts using stanford ner package in Spark/Scala. I have added the following in built.sbt:

libraryDependencies++=Seq(
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
"org.scalatest" %% "scalatest" % "3.0.0-M9"
)

Further, I have created an RDD where every element is a text(set of strings). Then, I have defined a function called "ner" which takes the text as input and returns names from them. For implemeting name extraction, here is the relevant part of code:

val serializedclassifier = "/home/hadoopuser/stanfordner/stanford-ner-2016-10-31/classifiers/german.conll.hgc_175m_600.crf.ser.gz"
val classifier = CRFClassifier.getClassifierNoExceptions(serializedclassifier)
def ner (a: String):String={
  val out = classifier.classify(a._2)
   ...
   ...
   ...}

The code gives me names when I do

rdd.take(10).foreach(x=>println(ner(x)))

but when I did

val rdd2 = rdd.map(x=>ner(x))

It threw the following error:

Loading classifier from /home/hadoopuser/stanfordner/stanford-ner-2016-10-31/classifiers/german.conll.hgc_175m_600.crf.ser.gz ... done [0.8 sec].
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2039)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.map(RDD.scala:365)
    at org.inno.redistagger.redistagger$.main(correcttags.scala:220)
    at org.inno.redistagger.redistagger.main(correcttags.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: edu.stanford.nlp.ie.crf.CRFClassifier
Serialization stack:
    - object not serializable (class: edu.stanford.nlp.ie.crf.CRFClassifier, value: edu.stanford.nlp.ie.crf.CRFClassifier@56dd6efa)
    - field (class: org.inno.redistagger.redistagger$$anonfun$9, name: classifier$1, type: class edu.stanford.nlp.ie.crf.CRFClassifier)
    - object (class org.inno.redistagger.redistagger$$anonfun$9, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 20 more

To get rid of this error, I defined the classifier variable inside the "ner" function as this:

val serializedclassifier = "/home/hadoopuser/stanfordner/stanford-ner-2016-10-31/classifiers/german.conll.hgc_175m_600.crf.ser.gz"
def ner (a: String):String={
   val classifier = CRFClassifier.getClassifierNoExceptions(serializedclassifier)
  val out = classifier.classify(a._2)
   ...
   ...
   ...}

This solves the above error but a new problem has popped up. The above "ner" function creates a variable classifier for every element of the rdd which has around 5 million elements. Creating the variable which is basically connecting to the ner library takes 0.5 seconds and hence it would take a lot of time for the code to complete. How can I solve this or serialize the classifier without defining it within the "ner" function?

Ravi Ranjan
  • 353
  • 1
  • 6
  • 22

0 Answers0