2

I have a system with 60 CPUs. I intend to parallelize the prediction of a Keras model on several images. I tried the following code:

img_model1 = tensorflow.keras.models.load_model('my_model.h5')
img_model2 = tensorflow.keras.models.load_model('my_model.h5')
img_model3 = tensorflow.keras.models.load_model('my_model.h5')
models=[img_model1,img_model2,img_model3] # all the three are same models

I tried to use indices to avoid weakref pickling error:

def _apply_df(args):
    df, model_index = args
    preds=prediction_generator(df['image_path'])
    return  models[model_index].predict(preds)

def apply_by_multiprocessing(df, workers):
    workers = workers
    pool = Pool(processes=workers)
    result = pool.map(_apply_df, [(d,i) for i,d in enumerate(np.array_split(df[['image_path']], workers))])
    pool.close()
    return pd.concat(list(result))
    

apply_by_multiprocessing(df=data, workers=3) 

The code keeps running forever without yielding any results... I guess the problem could be solved with tf.Sessions(), but I'm not sure how...

Vahid the Great
  • 393
  • 5
  • 18

1 Answers1

2

Load your model in the _apply_df function, so it doesn't get involved in pickling and sending to the process.

This is a simple code example without the use of pandas that runs a model on fashion-mnist data. I think you can adapt it to your use case.

import tensorflow as tf
import numpy as np
from multiprocessing import Pool


def _apply_df(data):
    model = tf.keras.models.load_model("my_fashion_mnist_model.h5")
    return model.predict(data)


def apply_by_multiprocessing(data, workers):

    pool = Pool(processes=workers)
    result = pool.map(_apply_df, np.array_split(data, workers))
    pool.close()
    return list(result)


def main():
    fashion_mnist = tf.keras.datasets.fashion_mnist
    _, (test_images, test_labels) = fashion_mnist.load_data()

    test_images = test_images / 255.0
    results = apply_by_multiprocessing(test_images, workers=3)
    print(test_images.shape)           # (10000, 28, 28)
    print(len(results))                # 3
    print([x.shape for x in results])  # [(3334, 10), (3333, 10), (3333, 10)]


if __name__ == "__main__":
    main()
H4iku
  • 674
  • 2
  • 6
  • 23
  • Thanks a lot. I had I also tried this at the beginning and something I just added to the question. This time, the problem is that the model keeps running forever! Please check the new edit. Thanks – Vahid the Great Dec 28 '21 at 12:49
  • Did you try the exact code in my answer? Did it keep running forever? It would be nice if you could provide a minimal running example since I can't reproduce this issue and my code finishes running after doing its job. – H4iku Dec 28 '21 at 14:35
  • Yeap. I copy-pasted your code. The change I made was to use this model https://github.com/umbertogriffo/Fashion-mnist-cnn-keras/blob/master/Output/fm_cnn_model.h5 as I didn't have access to yours. And I reshaped the data to its input shape accordingly .reshape(10000,1,28,28). And yeap. It keeps running forever. Without parallelizing, my own code works though. – Vahid the Great Dec 28 '21 at 15:01
  • The model you linked is a TF 1 model, while I am using a TF 2 model. Use this: https://drive.google.com/file/d/1dSBlDwy2ZdUaNAyEnOtm97OCQxP42Avk/view?usp=sharing Also, make sure to put your outside code (e.g., `apply_by_multiprocessing(df=data, workers=3)`) inside the `main` function. It is needed especially for the parallel code to work. – H4iku Dec 28 '21 at 15:58
  • I just used this one too. Still, doesn't work – Vahid the Great Dec 28 '21 at 16:33
  • It seems TensorFlow doesn't work well with `fork`, which is the default method to start child processes on Linux. Consider changing it to [`spawn` or `forkserver`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.set_start_method) by setting `multiprocessing.set_start_method('spawn')` right before initializing your `pool` to see if it fixes the issue. – H4iku Dec 28 '21 at 22:32
  • 1
    Thanks. 'spawn' wasn't necessary. I solved the problem using your code, but in addition, I had to import the functions used in the pool function, from a .py file. Turns out that this library has problems with .ipynb. Thanks again! – Vahid the Great Jan 13 '22 at 18:45
  • @H4iku will this load the model multiple times? So if I had to use a large number of workers, the memory I would need would increase a lot as well because it will load the model each time, right? – Kurt Sep 16 '22 at 20:59
  • @Kurt You're right, each process loads its own model. – H4iku Sep 17 '22 at 22:15
  • @H4iku:Hi! I have opened a new post [here](https://stackoverflow.com/questions/74348797/axis1-axis-0-is-out-of-bounds-for-array-of-dimension-0-trying-to-predict-by-u) which is the same solution you gave but using tensorflow datasets. If you happen to know the issue.. – George Nov 07 '22 at 15:13