I using NLP library written in Java 8 to analyze many text notes (extract Named Entities). Because NLP libraries typically require external resources and take long time to initialize and load, it makes sense to initialize them per RDD partition once rather than for every entry in RDD (per pattern described here Apache Spark: map vs mapPartitions? and then run analysis on each record through NLP pipeline.
So, here goes the high level code:
JavaRDD<String> notes = sc.textFile("adl:///tmp/testdata100.txt");
JavaRDD<String> output = notes.mapPartitions(new MyFlatMapFunction());
public class MyFlatMapFunction implements
FlatMapFunction<java.util.Iterator<String>, String> {
private void setup() throws Exception {
InitializeNLPLibraries();
}
private void readObject(ObjectInputStream in) {
try {
in.defaultReadObject();
this.setup();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public java.util.Iterator<String> call(java.util.Iterator<String> partition)
throws Exception {
//do actual analysis here
}
So, this is //do actual analysis here
where I am stuck. Since I have java.util.Iterator<String> partition
, I would need to somehow convert it to a Spark-compatible collection of some kind (without creating extra copy of collection elements), say, Collection<String> rows
on which I can run rows.map(MyAnalysisFunction());
where MyAnalysisFunction is of type
class MyAnalysisFunction implements Function<String, String> {
@Override
public String call(String note) throws Exception {
//analyze individual note
}
So, here is where I am puzzled. How to go from java.util.Iterator<String> partition
to a spark-compatible collection in Java 8 so I can run .map(MyAnalysisFunction())
on it.
Your help and guidance is appreciated.
Thanks in advance!