2

I have a class in which I instance a Keras model to perform predictions. This class is organized somewhat like this:

class MyClass():
    def __init__(self):
        
        self.model = None

    def load(path):
        
        self.model = tf.keras.models.load_model(path_)

    def inference(data):

        #...
        pred = self.model.predict(data)
        #...
        
        return pred

I have been trying to run the MyClass.inference method in parallel. I tried it with joblib.Parallel:

from joblib import Parallel, delayed

n_jobs = 8

myobj = MyClass()
myobj.load(<Path_to_model>)

results = Parallel(n_jobs=n_jobs )(delayed(myobj.inference)(d) for d in mydata))

But I get the following error: TypeError: cannot pickle 'weakref' object

Apparently, this is a known issue with Keras (https://github.com/tensorflow/tensorflow/issues/34697), that should have been fixed on TF 2.6.0. But after upgrading tensorflow to 2.6.0, I still get the same error. I even tried tf-nightly, as suggested in the same issue, but it also did not work.

I also tried replacing pickle with dill, by import dill as pickle, but it did not fix it.

The only thing that actually worked is replacing the loky backend in Parallel by threading. However, in one scenario I tried using threading ends up taking pretty much the same time (or a bit slower) as performing the MyClass.inference calls sequentially.

My question is: what are my options here? Is there any way to run a preloaded keras model's predict in parallel, such as with other python libs?

Alberto A
  • 1,160
  • 4
  • 17
  • 35
  • 1
    Python multiprocessing is inherently fickle :/ What often works as a workaround against the weird pickling issues is to have the thing you want to run in parallel be a stand-alone top-level function that's "self sufficient", because trying to run things in parallel involves pickling the relevant context (the `mymodel` class in your case). – cadolphs Oct 15 '21 at 19:49
  • I don't get the question. TF/Keras uses parallelization by default. Give it some data and it will run inference on GPU or utilize CPU with multi-threading. You should not attempt to do parallelization yourself. Why doesn't `myobj.inference(data)` work for you? – Ufos Oct 16 '21 at 13:57
  • You are correct in that Keras uses parallelization by default. The issue is how my pipeline is organized in that when I make the Kera's calls, i don't have an unified view of the data (I use Keras to extract some special features from my data). So that is why I, as of now, need to perform the calls themself in parallel – Alberto A Oct 18 '21 at 15:09

1 Answers1

0

I was able to reproduce this behaviour using tensorflow==2.2.0 and keras==2.4.3.

joblib.Parallel rely on pickle to share function definition across multiple python processes, but pickle implementation has several limitations (doc).

I didn't experiment the benefits of using this solution against serial runs, given that you didn't specify if you are looking for "multiprocessing" or "multithreading" solution. But using concurrent.futures (doc) is a way to handle this problem.

The concurrent.futures module provides a high-level interface for asynchronously executing callables.

The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.

This is the code I used to get my model predicting in parallel.

from tornado import concurrent

futures = []
# max_workers: The maximum number of threads that can be used to execute the given calls.
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    for d in mydata:
        # Schedules the callable "inference()" to be executed 
        # and returns a Future instance representing the execution of the "inference()".
        future = executor.submit(myobj.inference, d)
        futures.append(future)

# Loop over futures to wait for them in the order they were submitted:
for future in futures:
    result = future.result()
    print(result)

In order to get each prediction result as soon as it's ready, even if they come out of order, you can use as_completed:

for future in concurrent.futures.as_completed(futures):
    print(future.result())

You can do some tests with your data to see if the use of multithreading could actually speed up the excecution, given that this is a CPU bound task.

These are some of the best tips and applications on python parallel processing from the fastai community: link

Bill
  • 315
  • 3
  • 18