I have a class Processor
, that takes in some input data (which we are going to call examples), processes the input data, and outputs the results. At a high level it looks like this:
from tqdm import tqdm
import multiprocessing
class Processor:
def __init__(self, arg1, arg2, model_path):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = load_model_from_path(model_path)
def process_all_examples(self, all_examples):
all_results = []
pool = multiprocessing.Pool(4)
for result in tqdm(pool.imap_unordered(self.process_single_example, all_examples), total=len(all_examples)):
all_results.append(result)
return all_results
def process_single_example(self, example):
# do some complicated calculations on the example that use
# self.arg1, self.arg2, and self.model
return result
The idea is that the processor is initialized once (loading the model takes a good amount of time) and can take advantage of a multicore machine to process the input examples. The above doesn't work, since class methods are non pickle-able for multiprocessing. After consulting the following StackOverflow posts:
call multiprocessing in class method Python
Multiprocessing: How to use Pool.map on a function defined in a class?
Multiprocessing: How to use Pool.map on a function defined in a class?
I came up with the following solution:
from tqdm import tqdm
import multiprocessing
class Processor:
def __init__(self, arg1, arg2, model_path):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = load_model_from_path(model_path)
def process_all_examples(self, all_examples):
all_results = []
all_inputs = [(self, example) for example in all_examples]
pool = multiprocessing.Pool(4)
for result in tqdm(pool.imap_unordered(self.process_single_example, all_inputs), total=len(all_inputs)):
all_results.append(result)
return all_results
@staticmethod
def process_single_example(inputs):
self, example = inputs
# do some complicated calculations on the example that use
# self.arg1, self.arg2, and self.model
return result
However, this didn't work. If I try to run process_all_examples
it will get stuck at .imap_unordered
. For testing purposes, I tried it using some dummy data/processing to understand what was happening, but rather than getting stuck, the multiprocessing was just super slow:
from tqdm import tqdm
import multiprocessing
class Processor:
def __init__(self, arg1, arg2):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = [i for i in range(1000)]
def process_all_examples_multi(self, all_examples, nproc=4):
all_results = []
all_inputs = [(self, example) for example in all_examples]
pool = multiprocessing.Pool(nproc)
for result in tqdm(pool.imap_unordered(self.process_single_example, all_inputs), total=len(all_inputs)):
all_results.append(result)
return all_results
def process_all_examples_single(self, all_examples):
all_results = []
all_inputs = [(self, example) for example in all_examples]
for _input in tqdm(all_inputs):
all_results.append(self.process_single_example(_input))
return all_results
@staticmethod
def process_single_example(inputs):
self, example = inputs
result = self.arg1 * self.arg2 * self.model[3] * example
return result
processor = Processor(-1, 2)
all_examples = list(range(100000))
results = processor.process_all_examples_multi(all_examples) # slower
results = processor.process_all_examples_single(all_examples) # faster
Adding a chunksize
parameter (with a value between 100 and 10000) to .imap_unordered
seems to significantly increase performance, but it never surpasses that of just using a single core without multiprocessin.Pool
.
I know there are alternatives, one being redesign the way my code is structured, the other being use globals, but I can't shake the feeling that I'm just missing something here. I've also tried using the pathos.multiprocessing
module from the pathos
library to no avail.