3

I am trying to speed up processing of large lists of texts via parallelisation of textacy. When I use Pool from multiprocessing the resulting textacy corpus comes out empty. I am not sure if the problem is in the way I use textacy or multiprocessing paradigm? Here is the example that illustrates my issue:

import spacy
import textacy
from multiprocessing import Pool

texts_dict={
"key1":"First text 1."
,"key2":"Second text 2."
,"key3":"Third text 3."
,"key4":"Fourth text 4."
}

model=spacy.load('en_core_web_lg')

# this works

corpus = textacy.corpus.Corpus(lang=model)

corpus.add(tuple([value, {'key':key}],) for key,value in texts_dict.items())

print(corpus) # prints Corpus(4 docs, 8 tokens)
print([doc for doc in corpus])

# now the same thing with a worker pool returns empty corpus

corpus2 = textacy.corpus.Corpus(lang=model)

pool = Pool(processes=2) 
pool.map( corpus2.add, (tuple([value, {'key':key}],) for key,value in texts_dict.items()) )

print(corpus2) # prints Corpus(0 docs, 0 tokens)
print([doc for doc in corpus2])

# to make sure we get the right data into corpus.add
pool.map( print, (tuple([value, {'key':key}],) for key,value in texts_dict.items()) )

Textacy is based on spacy. Spacy doesn't support multithreading but supposedly should be OK to run in multiple processes. https://github.com/explosion/spaCy/issues/2075

As per great suggeston of @constt https://stackoverflow.com/a/58317741/4634344 the collecting of the results into the corpus works for a datasets as large as n_docs= 10273 n_sentences= 302510 n_tokens= 2053129.

For a larger dataset (16000 docs 3MM tokens) I get a following error:

result_corpus=corpus.get() 
  File "<string>", line 2, in get
  File "/usr/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
    raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError: 
---------------------------------------------------------------------------
Unserializable message: Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/managers.py", line 283, in serve_client
    send(msg)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

´ I will investigate but if you have a direct solution - much appreciated!

Diego
  • 812
  • 7
  • 25

1 Answers1

4

Because of the fact that python processes run in separate memory spaces, you have to share your corpus object between processes in the pool. To do this, you have to wrap the corpus object into a sharable class which you'll register with a BaseManager class. Here is how you can refactor your code to make it work:

#!/usr/bin/python3
from multiprocessing import Pool
from multiprocessing.managers import BaseManager

import spacy
import textacy


texts = {
    'key1': 'First text 1.',
    'key2': 'Second text 2.',
    'key3': 'Third text 3.',
    'key4': 'Fourth text 4.',
}


class PoolCorpus(object):

    def __init__(self):
        model = spacy.load('en_core_web_sm')
        self.corpus = textacy.corpus.Corpus(lang=model)

    def add(self, data):
        self.corpus.add(data)

    def get(self):
        return self.corpus


BaseManager.register('PoolCorpus', PoolCorpus)


if __name__ == '__main__':

    with BaseManager() as manager:
        corpus = manager.PoolCorpus()

        with Pool(processes=2) as pool:
            pool.map(corpus.add, ((v, {'key': k}) for k, v in texts.items()))

        print(corpus.get())

Output:

Corpus(4 docs, 16 tokens)
constt
  • 2,250
  • 1
  • 17
  • 18
  • Thanks for a very clear suggestion! I have run it on the example and a smaller real dataset (n_docs= 153 n_sentences= 953 n_tokens= 7600) with no issues. However for a dataset with 16000 docs and ca. 3MM tokens an error arise. I will add the error message to the question. – Diego Oct 10 '19 at 11:23
  • n_docs= 10273 n_sentences= 302510 n_tokens= 2053129 went through as well – Diego Oct 10 '19 at 11:36
  • @Diego have you tried to split documents into chunks of length, say 10000, and process them sequentially in a pool using the same shared corpus? – constt Oct 10 '19 at 13:47
  • 1
    Yes I have just did that and it works fine up to same limit. The number 2,147,483,647 (or hexadecimal 7FFF,FFFF16) is the maximum positive value for a 32-bit signed binary integer in computing. I run 64-bit python. It seems this is the issue that affects me https://github.com/joblib/joblib/issues/731#issuecomment-409893849 But it has to be fixed already (but in Python 3.8?) Stage: resolved Components: Versions: Python 3.8 – Diego Oct 10 '19 at 21:38
  • I tried to install python 3.8 but there is no wheel for scipy (openblas,lapack) so I abandoned this avenue for now. Anyway, unfortunately the parallel execution as above doesn't bring any speed-up compared to a single process. I wonder if I should post my findings here or accept your answer and start another question? – Diego Oct 11 '19 at 14:33