1

I was trying to get fastText sentence embeddings for 80 Million English tweets using the parallelizing mechanism using dask as described in this answer: How do you parallelize apply() on Pandas Dataframes making use of all cores on one machine?

Here is my full code:

import dask.dataframe as dd
from dask.multiprocessing import get
import fasttext
import fasttext.util
import pandas as pd

print('starting langage: ' + 'en')
lang_output = pd.DataFrame()
lang_input = full_input.loc[full_input.name == 'en'] # 80 Million English tweets
ddata = dd.from_pandas(lang_input, npartitions = 96)
print('number of lines to compute: ' + str(len(lang_input)))
fasttext.util.download_model('en', if_exists='ignore')  # English
ft = fasttext.load_model('cc.'+'en'+'.300.bin')
fasttext.util.reduce_model(ft, 20)
lang_output['sentence_embedding'] = ddata.map_partitions(lambda lang_input: lang_input.apply((lambda x: get_fasttext_sentence_embedding(x.tweet_text, ft)), axis = 1)).compute(scheduler='processes')
print('finished en')

This is the get_fasttext_sentence_embedding function:

def get_fasttext_sentence_embedding(row, ft):
    if pd.isna(row):
        return np.zeros(20)
    return ft.get_sentence_vector(row)

But, I get a pickling error on this line:

lang_output['sentence_embedding'] = ddata.map_partitions(lambda lang_input: lang_input.apply((lambda x: get_fasttext_sentence_embedding(x.tweet_text, ft)), axis = 1)).compute(scheduler='processes')

This is the error I get:

TypeError: can't pickle fasttext_pybind.fasttext objects

Is there a way to parallelize fastText model get_sentence_vector with dask (or anything else)? I need to parallelize because getting sentence embeddings for 80 Million tweets takes two much time and one row of my data frame is completely independent of the other.

learner
  • 857
  • 1
  • 14
  • 28

2 Answers2

1

The problem here is that fasttext objects apparently can't be pickled, and Dask doesn't know how to serialize and deserialize this data structure without pickling.

The simplest way to use Dask here (but likely not the most efficient), would be to have each process define the ft model itself, which would avoid the need to transfer it (and thus avoid the attempted pickling). Something like the following would work. Notice that ft is defined inside the function being mapped across partitions.

First, some example data.

import dask.dataframe as dd
import fasttext
import pandas as pd
import dask
import numpy as np

df = pd.DataFrame({"text":['this is a test sentence', None, 'this is another one.', 'one more']})
ddf = dd.from_pandas(df, npartitions=2)
ddf

Dask DataFrame Structure:
text
npartitions=2   
0   object
2   ...
3   ...
Dask Name: from_pandas, 2 tasks

Next, we can tweak your functions to define ft within each process. This duplicates effort, but avoids the need to transfer the model. With that, we can smoothly run it via map_partitions.

def get_embeddings(sent, model):
    return model.get_sentence_vector(sent) if not pd.isna(sent) else np.zeros(10)

def func(df):
    ft = fasttext.load_model("amazon_review_polarity.bin") # arbitrary model
    res = df['text'].apply(lambda x: get_embeddings(x, model=ft))
    return res

ddf['sentence_vector'] = ddf.map_partitions(func)
ddf.compute(scheduler='processes')

text    sentence_vector
0   this is a test sentence [-0.01934033, 0.03729743, -0.04679677, -0.0603...
1   None    [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
2   this is another one.    [-0.0025579212, 0.0353713, -0.027139299, -0.05...
3   one more    [-0.014522496, 0.10396308, -0.13107553, -0.198...

Note that this nested data structure (list in a column) is probably not the optimal way to handle these vectors, but it will depend on your use case. Also, there is probably a way to do this computation in batches using fastext rather than one row at a time (in Python), but I'm not well versed in the nuances of fastext.

Nick Becker
  • 4,059
  • 13
  • 19
1

I had the same problem, but I found a solution using Multiprocessing - Python's Standard Library.

First step - wrap

model = fasttext.load_model(file_name_model)

def get_vec(txt):
    ''' 
    First tried to put model.get_sentence_vector into map (look below), but it resulted in pickle error.
    This works, lol.
    '''
    return model.get_sentence_vector(txt)

Then, I'm doing this:

from multiprocessing import Pool

text = ["How to sell drugs (fast)", "House of Cards", "The Crown"]

with Pool(40) as p: # I have 40 cores
    result = p.map(get_vec, text)

With 40 cores processing 10M short texts took me ~80s.