0

I using NLP library written in Java 8 to analyze many text notes (extract Named Entities). Because NLP libraries typically require external resources and take long time to initialize and load, it makes sense to initialize them per RDD partition once rather than for every entry in RDD (per pattern described here Apache Spark: map vs mapPartitions? and then run analysis on each record through NLP pipeline.

So, here goes the high level code:

JavaRDD<String> notes = sc.textFile("adl:///tmp/testdata100.txt");
JavaRDD<String> output = notes.mapPartitions(new MyFlatMapFunction());

public class MyFlatMapFunction implements 
FlatMapFunction<java.util.Iterator<String>, String> {

private void setup() throws Exception {
     InitializeNLPLibraries();
}

private void readObject(ObjectInputStream in) {
    try {
        in.defaultReadObject();
        this.setup();
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
@Override
public java.util.Iterator<String> call(java.util.Iterator<String> partition) 
throws Exception {
 //do actual analysis here
}

So, this is //do actual analysis here where I am stuck. Since I have java.util.Iterator<String> partition, I would need to somehow convert it to a Spark-compatible collection of some kind (without creating extra copy of collection elements), say, Collection<String> rows on which I can run rows.map(MyAnalysisFunction()); where MyAnalysisFunction is of type

 class MyAnalysisFunction implements Function<String, String> {

    @Override
    public String call(String note) throws Exception {
    //analyze individual note
    }

So, here is where I am puzzled. How to go from java.util.Iterator<String> partition to a spark-compatible collection in Java 8 so I can run .map(MyAnalysisFunction()) on it.

Your help and guidance is appreciated.

Thanks in advance!

YuGagarin
  • 341
  • 7
  • 20
  • I don't know spark, but `without` copying elements? From an iterator? I can't imagine it being possible – Eugene Dec 10 '17 at 20:51
  • This `FlatMapFunction, String>` signature is giving me headaches. Is this actually a disguised `Function`? And how does anything in your question below the first paragraph relate to that first paragraph? What is this code supposed to do? At the moment it reads like, you want something you “can run `.map(MyAnalysisFunction())` on”, regardless of what it will do… – Holger Dec 11 '17 at 10:30
  • @Holger,thanks for your comment.JavaRDD has mapPartitions API https://spark.apache.org/docs/2.2.0/api/java/index.html?org/apache/spark/api/java/JavaRDD.html whose signature takes FlatMapFunction interface.It is very similar to Function interface but runs on per-partition basis and takes all rows of RDDs partition as input via Iterator.I am trying to get as much parallelism out of code on Spark as possible. So,have to first run mapPartitions to load all the NLP-related resources _per RDD's partition_ and then process partition's rows by running further .map() on them.Hopefully,it is clearer now – YuGagarin Dec 11 '17 at 18:55
  • But isn’t doing the heavy work in `setup()` already sufficient? The linked answer talks about overhead that would happen inside `map`, i.e. for every element, that can be avoided by using `mapPartitions` instead. But your initialization overhead isn’t inside the mapping function. Without knowing anything about Spark, I’m quiet sure that it will deserialize the function per partition rather than for every element, so `setup()` is already called once per partition only. Everything else would be illogical. – Holger Dec 12 '17 at 08:20
  • @Holger, you are right. That is why I switched from .map to mapPartitions. But each partition can be quite large and one have to process rows within partition serially. One can try to balance number of partitions per RDD to do more work in parallel but number of nodes in the cluster is typically limited. I think the real question is if I can get more parallelism by further using .map within a partition collection of rows or not or there is no way but to process them serially. – YuGagarin Dec 12 '17 at 10:21
  • ??? When using `map`, the data gets partitioned as well, and `setup` will be invoked once per partition in either case, so the conclusion of the previous comment was, that you don’t gain anything by using `mapPartitions`. Answering to this conclusion with “*that’s*” the reason for the switch”, doesn’t make any sense. Your idea getting more parallelism inside of this means basically “I’m using this dc tool but think that I can make dc better by doing the dc tool’s job while using it”. If Spark does provide the optimal parallelism already, you should check the configuration instead. – Holger Dec 12 '17 at 11:41
  • @Holger Without using `mapPartitions` by just using `map` on RDD `setup()` will be invoked for every row in the RDD no matter how it is partitioned because `setup()` will be within `MyAnalysisFunction()` and that is exactly the reason for the switch. – YuGagarin Dec 12 '17 at 14:30
  • Who is instantiating `MyAnalysisFunction` for every row? That’s surely not how it is supposed to work. And now it becomes obvious that this is an [xy problem](https://meta.stackexchange.com/a/66378/242352); instead of asking about your actual problem, you’re asking about the *attempted solution*. – Holger Dec 12 '17 at 14:35
  • Initialization per row as described in [Apache Spark: map vs mapPartitions?](https://stackoverflow.com/questions/21185092/apache-spark-map-vs-mappartitions/39203798#39203798) – YuGagarin Dec 12 '17 at 15:04
  • As said before, this answer talks about overhead *within the function*, i.e. the equivalent of the `call` method. At no point it is claiming that instantiation of the function instance was happening for each row, most notably, because the Scala code has no notion of instantiation… – Holger Dec 12 '17 at 18:06

0 Answers0