4

I'm trying to perform model predictions in parallel using the model.predict command provided by keras in python2. I use tensorflow 1.14.0 for python2. I have 5 model (.h5) files and would like the predict command to run in parallel.This is being run in python 2.7. I'm using multiprocessing pool for mapping the model filenames with the prediction function on multiple processes as shown below,

import matplotlib as plt
import numpy as np
import cv2
from multiprocessing import Pool
pool=Pool()
def prediction(model_name):
    global input
    from tensorflow.keras.models import load_model
    model=load_model(model_name)
    ret_val=model.predict(input).tolist()[0]
    return ret_val

models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
start_time=time.time()
res=pool.map(prediction,models)
print('Total time taken: {}'.format(time.time() - start_time))
print(res)

The input is an image numpy array obtained from another part of the code. But on executing this I get the following,

Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
  File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
    self.run()
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
    return recv()
AttributeError: 'module' object has no attribute 'prediction'
AttributeError: 'module' object has no attribute 'prediction'

I'm not able to interpret this error message and how do I go about solving this? Any advice is much appreciated!

UPDATE 2: Thanks for all the pointers and for a full example @sokato. I executed the exact code posted by @sokato, however i got the following error(i made the changes in my code too and get the same error shown below),

Traceback (most recent call last):
  File "stackoverflow.py", line 47, in <module>
    with multiprocessing.Pool() as p:
AttributeError: __exit__

UPDATE3: Thanks for all the support.I think the issue in UPDATE2 was due to usage of python2 instead of python3. I was able to solve the error given in UPDATE2 for python2 by using with closing(multiprocessing.Pool()) as p: instead of just with multiprocessing.Pool() as p: in @sokato's code. Import the closing function as follows: from contextlib import closing

NEW ISSUE USING A DIFFERENT APPROACH SHOWN BELOW,

I actually have multiple inputs coming in. Instead of loading model each time for each input I want to load all the models before hand and keep it in a list. I have done this as shown below,

import matplotlib as plt
import numpy as np
import cv2
import multiprocessing
import tensorflow as tf
from contextlib import closing
import time

models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
loaded_models=[]
for model in models:
    loaded_models.append(tf.keras.models.load_model(model))

def prediction(input_tuple):
    inputs,loaded_models=input_tuple
    predops=[]
    for model in loaded_models:
        predops.append(model.predict(inputs).tolist()[0])
    actops=[]
    for predop in predops:
        actops.append(predop.index(max(predop)))
    max_freqq = max(set(actops), key = actops.count) 
    return max_freqq

#....some pre-processing....#

    '''new_all_t is a list which contains tuples and each tuple has inputs from all_t 
    and the list containing loaded models which will be extracted
 in the prediction function.'''

new_all_t=[]
for elem in all_t:
    new_all_t.append((elem,loaded_models))
start_time=time.time()
with closing(multiprocessing.Pool()) as p:
    predops=p.map(prediction,new_all_t)
print('Total time taken: {}'.format(time.time() - start_time))

new_all_t is a list which contains tuples and each tuple has inputs from all_t and the list containing loaded models which will be extracted in the prediction function.However, I get the following error now,

Traceback (most recent call last):
  File "trial_mult-ips.py", line 240, in <module>
    predops=p.map(prediction,new_all_t)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
NotImplementedError: numpy() is only available when eager execution is enabled.

What exactly does this indicate? How do I go about solving this?

UPDATE4: I included the lines tf.compat.v1.enable_eager_execution() and tf.compat.v1.enable_v2_behavior() at the very beginning. Now i get the following error,

WARNING:tensorflow:From /home/nick/.local/lib/python2.7/site-packages/tensorflow/python/ops/math_grad.py:1250: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where

Traceback (most recent call last):
  File "the_other_end-mp.py", line 216, in <module>
    predops=p.map(prediction,modelon)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
ValueError: Resource handles are not convertible to numpy.

I'm not able to interpret this error message and how do I go about solving this? Any advice is much appreciated!

Nick Rogers
  • 328
  • 1
  • 5
  • 16
  • Please include a repo of your model files (or in some other manner) so I can recreate your issue. You also don't specifiy your input value anywhere. So the full code would be nice. – sokato Mar 28 '20 at 22:17
  • Please provide a [mcve]. Is there any particular reason you're using Python 2? – AMC Mar 28 '20 at 22:36
  • 1
    Did you find a solution for this question? – shivam13juna Dec 20 '20 at 08:27

1 Answers1

3

So, I am unsure of some of your design choices but I gave it the best attempt with the given information. Specifically, I think there are maybe some issues with the global variable and the import statement within your parallel function.

  1. You should use shared variables and not global variables to share an input between processes. You can read more about shared memory if you want in the multiprocessing documentation.

  2. I generated models from a tutorial since your models are not included.

  3. You are not joining or closing your pool but with the following code I was able to get the code to execute in parallel successfully. You can close the pool by calling pool.close() or with the "with" syntax shown in below. Note, the with syntax doesn't apply to python 2.7.

import numpy as np
import multiprocessing, time, ctypes, os
import tensorflow as tf

mis = (28, 28) #model input shape
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

def createModels(models):
    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=mis),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10)
    ])

    model.compile(optimizer='adam',
               loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
               metrics=['accuracy'])

    model.fit(x_train, y_train, epochs=5)

    for mod in models:
        model.save(mod)

def prediction(model_name):

    model=tf.keras.models.load_model(model_name)
    ret_val=model.predict(input).tolist()[0]
    return ret_val

if __name__ == "__main__":
    models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
    dir = os.listdir(".")
    if models[0] not in dir:
        createModels(models)
    # Shared array input
    ub = 100
    testShape = x_train[:ub].shape
    input_base = multiprocessing.Array(ctypes.c_double, 
    int(np.prod(testShape)),lock=False)
    input = np.ctypeslib.as_array(input_base)
    input = input.reshape(testShape)
    input[:ub] = x_train[:ub]

    # with multiprocessing.Pool() as p:  #Use me for python 3
    p = multiprocessing.Pool() #Use me for python 2.7
    start_time=time.time()
    res=p.map(prediction,models)
    p.close() #Use me for python 2.7
    print('Total time taken: {}'.format(time.time() - start_time))
    print(res)

I hope this helps.

sokato
  • 354
  • 4
  • 14
  • Thanks for the points, i executed your code but got the following error, Traceback (most recent call last): File "stackoverflow.py", line 47, in with multiprocessing.Pool() as p: AttributeError: __exit__ – Nick Rogers Mar 29 '20 at 04:55
  • Sorry, that is because the context manager for pool is not included for python 2.7. See my updated response which I ran with 2.7 to make sure it worked. – sokato Mar 29 '20 at 05:53
  • Have a look at UPDAT2 & UPDATE3 in the original question for a fix for python2. Thanks a lot @sokato. However the sequential version of this is 3 times faster than the parallel version. Is this not a good idea? I know it depends on the model. Is the model.predict already using multiprocessing (for a single instance)? But I would expect to get a speedup when running multiple model predictions in parallel compared to sequential. Is the Inter Process Communication overhead too much that it over weighs the processing? – Nick Rogers Mar 29 '20 at 05:58
  • Sorry @sokato I hadn't refreshed the page to see your comment. You have commented earlier! Thanks a lot. But can you have a look at my comment, i have another doubt. – Nick Rogers Mar 29 '20 at 06:01
  • 1
    @NickRogers there is expense associated with allocating the pool and process management. Not all applications are well suited for parallel processing. I believe that tensor flow has the option of multiprocessing under the hood which may clash with multiprocessing pool itself. In my experience extensive calculations provide an obvious use for parallel programming. In this case the model evaluations tend to be simple so the process management overhead dominates. On the other hand if you had a large number of models to test or each model calculation was expensive itself it could be more useful. – sokato Mar 29 '20 at 06:07
  • Thanks @sokato, is there a built-in way to do predictions in parallel in tensorflow itself? If there is, it might be better than doing it like this... – Nick Rogers Mar 29 '20 at 06:14
  • @NickRogers I think https://stackoverflow.com/q/36610290/10500953 is along the lines of what you're looking for maybe? – sokato Mar 29 '20 at 06:22
  • Thankyou @sokato , but I have change the approach a bit now as shown under different approach in the original question post and I have encountered another error. You have understood my previous issue well and have given a good solution. Is this problem along those lines? How do I go about this issue? Thanks again – Nick Rogers Mar 29 '20 at 16:51
  • @NickRogers Have you seen https://stackoverflow.com/questions/58816662/notimplementederror-numpy-is-only-available-when-eager-execution-is-enabled – sokato Mar 30 '20 at 16:09
  • Thanks @sokato I updated the code with two lines seen under UPDATE4 in question. I got another error. I use tensorflow 1.14.0 for python2. – Nick Rogers Mar 31 '20 at 13:25
  • Here's another question i have, https://stackoverflow.com/questions/60951638/how-to-simultaneously-read-audio-samples-while-recording-in-python-for-real-time – Nick Rogers Mar 31 '20 at 13:27