I am trying to run multiprocessing package in metaflow, in which fasttext model is running to predict some results. Here is my code:
import pickle
import os
import boto3
import multiprocessing
from functools import partial
from multiprocessing import Manager
import time
import pickle
from metaflow import batch, conda, FlowSpec, step, conda_base, Flow, Step
from util import pip_install_module
@conda_base(libraries={'scikit-learn': '0.23.1', 'numpy': '1.22.4', 'pandas': '1.5.1', 'fasttext': '0.9.2'})
class BatchInference(FlowSpec):
pip_install_module("python-dev-tools", "2023.3.24")
@batch(cpu=10, memory=120000)
@step
def start(self):
import pandas as pd
import numpy as np
self.df_input = ['af', 'febrt' ,'fefv fd we' ,'fe hth dw hytht' ,' dfegrtg hg df reg']
self.next(self.predict)
@batch(cpu=10, memory=120000)
@step
def predict(self):
import fasttext
fasttext.FastText.eprint = lambda x: None
print('model reading started')
#download the fasttext model from aws s3.
manager = Manager()
model_abn = manager.list([fasttext.load_model('fasttext_model.bin')])
print('model reading finished')
time_start = time.time()
pool = multiprocessing.Pool()
#results = pool.map(self.predict_abn, self.df_input)
results = pool.map(partial(self.predict_abn, model_abn=model_abn), self.df_input)
pool.close()
pool.join()
time_end = time.time()
print(f"Time elapsed: {round(time_end - time_start, 2)}s")
self.next(self.end)
@step
def end(self):
print("Predictions evaluated successfully")
def predict_abn(self,text, model_abn):
model = model_abn[0]
return model.predict(text,k=1)
if __name__ == '__main__':
BatchInference()
The error message is:
TypeError: cannot pickle 'fasttext_pybind.fasttext' object
I was told this is because fasttext model cannot be serialised. And I also try other message, for example:
self.model_bytes_abn = pickle.dumps(model_abn)
to transfer the model to bytes type. But still does not work.
Plz tell me what is wrong about the code and how to fix it?