2

I am currently trying to process large volumes of text. As part of the process I want to do things such as tokenization and stemming. However some of my steps require loading an external model (for example the OpenNLP tokenizers). I am currently trying the following approach:

    SparkConf sparkConf = new SparkConf().setAppName("Spark Tokenizer");
    JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
    SQLContext sqlContext = new SQLContext(sparkContext);
    DataFrame corpus = sqlContext.read().text("/home/zezke/document.nl");

    // Create pipeline components
    Tokenizer tokenizer = new Tokenizer()
            .setInputCol("value")
            .setOutputCol("tokens");
    DataFrame tokenizedCorpus = tokenizer.transform(corpus);

    // Save the output
    tokenizedCorpus.write().mode(SaveMode.Overwrite).json("/home/zezke/experimentoutput");

The current approach I am trying is using a UnaryTransformer.

public class Tokenizer extends UnaryTransformer<String, List<String>, Tokenizer> implements Serializable {

    private final static String uid = Tokenizer.class.getSimpleName() + "_" + UUID.randomUUID().toString();

    private static Map<String, String> stringReplaceMap;

    @Override
    public void validateInputType(DataType inputType) {
        assert (inputType.equals(DataTypes.StringType)) :
                String.format("Input type must be %s, but got %s", DataTypes.StringType.simpleString(), inputType.simpleString());
    }

    public Function1<String, List<String>> createTransformFunc() {
        Function1<String, List<String>> f = new TokenizerFunction();
        return f;
    }

    public DataType outputDataType() {
        return DataTypes.createArrayType(DataTypes.StringType, true);
    }

    public String uid() {
        return uid;
    }

    private class TokenizerFunction extends AbstractFunction1<String, List<String>> implements Serializable {
        public List<String> apply(String sentence) {
             ... code goes here
        }
    }

}

Now my questions are:

  1. What is the best time to load the model? I don't want to load the model multiple times.
  2. How do I distribute the model to various nodes?

Thanks in advance, Spark is a bit daunting to get into, but it looks promising.

Bram Vandewalle
  • 1,624
  • 3
  • 20
  • 29

1 Answers1

0

You can load the model in the driver code, and store it as an attribute in the Tokenizer object. The model will be serialized and transport to the worker nodes automatically. This method also requires the model to fit inside the driver's memory.

Otherwise, you can load the model inside createTransformFunc(), store it inside TokenizerFunction object as an attribute. I believe this way, each worker node will load the model on its own, though I'm not 100% sure.

Kien Truong
  • 11,179
  • 2
  • 30
  • 36
  • That doesn't seem to work since the OpenNLP classes aren't serializable. Serialization stack: - object not serializable (class: opennlp.tools.tokenize.TokenizerME, value: opennlp.tools.tokenize.TokenizerME@100fc5a) – Bram Vandewalle Jul 06 '16 at 15:01
  • Try the second method, and store the ONLP model as a transient attribute in `TokenizerFunction` – Kien Truong Jul 06 '16 at 15:42
  • Do you have a link to some documentation on these transient attributes? I am not familiar with them and Google is not that helpful on the matter. – Bram Vandewalle Jul 11 '16 at 06:20