0

This question is a continuing from this but using tensorflow datasets.

So , if we use:

import tensorflow as tf
import numpy as np
from multiprocessing import Pool
from keras.datasets import fashion_mnist
from tensorflow.keras.models import Sequential
 
# importing various types of hidden layers
from tensorflow.keras.layers import Conv2D, MaxPooling2D,\
Dense, Flatten
 
# Adam optimizer for better LR and less loss
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import numpy as np

# gpu setup
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
    
    
def model_arch():
    models = Sequential()
 
    # We are learning 64
    # filters with a kernal size of 5x5
    models.add(Conv2D(64, (5, 5),
                      padding="same",
                      activation="relu",
                      input_shape=(28, 28, 1)))
 
    # Max pooling will reduce the
    # size with a kernal size of 2x2
    models.add(MaxPooling2D(pool_size=(2, 2)))
    models.add(Conv2D(128, (5, 5), padding="same",
                      activation="relu"))
 
    models.add(MaxPooling2D(pool_size=(2, 2)))
    models.add(Conv2D(256, (5, 5), padding="same",
                      activation="relu"))
 
    models.add(MaxPooling2D(pool_size=(2, 2)))
 
    # Once the convolutional and pooling
    # operations are done the layer
    # is flattened and fully connected layers
    # are added
    models.add(Flatten())
    models.add(Dense(256, activation="relu"))
 
    # Finally as there are total 10
    # classes to be added a FCC layer of
    # 10 is created with a softmax activation
    # function
    models.add(Dense(10, activation="softmax"))
    return models

def _apply_df(data):
    model = model_arch()
    model.load_weights("/home/ggous/model_mnist.h5")
    return model.predict(data)

def apply_by_multiprocessing(data, workers):

    pool = Pool(processes=workers)
    result = pool.map(_apply_df, np.array_split(data, workers))
    pool.close()
    return list(result)

def resize_and_rescale(data):
    data = tf.cast(data, tf.float32)
    data /= 255.0
    return data
    
def prepare(ds):
    ds = ds.map(resize_and_rescale)
    return ds.batch(1)

def after_prepare(data):
    tens_data = tf.data.Dataset.from_tensor_slices(data)
    tens_data = prepare(tens_data)
    return tens_data

def main():
    fashion_mnist = tf.keras.datasets.fashion_mnist
    _, (test_images, test_labels) = fashion_mnist.load_data()

    test_images = after_prepare(test_images)
    results = apply_by_multiprocessing(test_images, workers=3)
    print(test_images.shape)           # (10000, 28, 28)
    print(len(results))                # 3
    print([x.shape for x in results])  # [(3334, 10), (3333, 10), (3333, 10)]


if __name__ == "__main__":
    main()

we get an error:

axis1: axis 0 is out of bounds for array of dimension 0

I have just added:

def resize_and_rescale(data):
    data = tf.cast(data, tf.float32)
    data /= 255.0
    return data

def prepare(ds):
    ds = ds.map(resize_and_rescale)
    return ds.batch(1)

def after_prepare(data):
    tens_data = tf.data.Dataset.from_tensor_slices(data)
    tens_data = prepare(tens_data)
    return tens_data

so, I created tensorflow datasets in after_prepare.

The saved model can be found here

-- UPDATE --

Now, it gives me messages:

F tensorflow/stream_executor/cuda/cuda_driver.cc:146] Failed setting context: CUDA_ERROR_NOT_INITIALIZED: initialization error

I saw this , so I tried:

multiprocessing.set_start_method('spawn', force=True)

at the beginning of the code and now gives me many messages:

 Start cannot spawn child process: No such file or directory
2022-11-08 09:12:35.984897: I tensorflow/core/platform/default/subprocess.cc:304] Start cannot spawn child process: No such file or directory
2022-11-08 09:12:35.984909: W tensorflow/stream_executor/gpu/asm_compiler.cc:80] Couldn't get ptxas version string: INTERNAL: Couldn't invoke ptxas --version
2022-11-08 09:12:35.985087: I tensorflow/core/platform/default/subprocess.cc:304] Start cannot spawn child process: No such file or directory
2022-11-08 09:12:35.985118: W tensorflow/stream_executor/gpu/redzone_allocator.cc:314] INTERNAL: Failed to launch ptxas

...
failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.618099: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.618274: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 230.40M (241592064 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.618437: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 207.36M (217433088 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.618447: W tensorflow/core/common_runtime/bfc_allocator.cc:360] Garbage collection: deallocate free memory regions (i.e., allocations) so that we can re-allocate a larger region to avoid OOM due to memory fragmentation. If you see this message frequently, you are running near the threshold of the available device memory and re-allocation may incur great performance overhead. You may try smaller batch sizes to observe the performance impact. Set TF_ENABLE_GPU_GARBAGE_COLLECTION=false if you'd like to disable this feature.
2022-11-08 09:12:36.629520: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.629542: W tensorflow/core/common_runtime/bfc_allocator.cc:290] Allocator (GPU_0_bfc) ran out of memory trying to allocate 203.00MiB with freed_by_count=0. The caller indicates that this is not a failure, but this may mean that there could be performance gains if more memory were available.
2022-11-08 09:12:36.629618: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.629987: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.630001: W tensorflow/core/common_runtime/bfc_allocator.cc:290] Allocator (GPU_0_bfc) ran out of memory trying to allocate 203.00MiB with freed_by_count=0. The caller indicates that this is not a failure, but this may mean that there could be performance gains if more memory were available.
2022-11-08 09:12:36.630110: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 230.40M (241592064 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
....
failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:37.256468: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:37.256640: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:37.256810: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:37.256988: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:37.257166: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:37.257224: W tensorflow/core/framework/op_kernel.cc:1780] OP_REQUIRES failed at conv_ops_fused_impl.h:601 : NOT_FOUND: No algorithm worked!  Error messages:
  Profiling failure on CUDNN engine 1#TC: RESOURCE_EXHAUSTED: Out of memory while trying to allocate 16777216 bytes.
  Profiling failure on CUDNN engine 1: RESOURCE_EXHAUSTED: Out of memory while trying to allocate 16777216 bytes.
George
  • 5,808
  • 15
  • 83
  • 160

1 Answers1

1

The problem comes from the data preparation step. The initial code takes the data of the shape of (10000, 28, 28), and using np.array_split breaks it into a list of numpy arrays of the size of workers (here a list of 3 numpy arrays since workers=3) to be processed by each worker.

Your input after returning from the after_prepare function is a list of 1000 tensors because you are using batch(1), and this data produces the error when it reaches the np.array_split call.

You have two options to solve this problem:

Option 1. Don't batch your data in the prepare function and only return ds. Then in the apply_by_multiprocessing function change

result = pool.map(_apply_df, np.array_split(data, workers))

to

result = pool.map(_apply_df, np.array_split(list(data.as_numpy_iterator()), workers))

Option 2. Again don't batch your data in the prepare function and only return ds. Then in the apply_by_multiprocessing function change

result = pool.map(_apply_df, np.array_split(data, workers))

to

result = pool.map(_apply_df, data.batch(np.ceil(len(data) / workers)))

Note that this produces a slightly different output shape due to how the batch size is calculated.

A working code example using Option 2 is below:

import os
import tensorflow as tf
import numpy as np
import multiprocessing
from multiprocessing import Pool
from itertools import chain
from keras.datasets import fashion_mnist
from tensorflow.keras.models import Sequential

# importing various types of hidden layers
from tensorflow.keras.layers import Conv2D, MaxPooling2D,\
Dense, Flatten

# Adam optimizer for better LR and less loss
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import numpy as np

# gpu setup
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)


def model_arch():
    models = Sequential()

    # We are learning 64
    # filters with a kernal size of 5x5
    models.add(Conv2D(64, (5, 5),
                      padding="same",
                      activation="relu",
                      input_shape=(28, 28, 1)))

    # Max pooling will reduce the
    # size with a kernal size of 2x2
    models.add(MaxPooling2D(pool_size=(2, 2)))
    models.add(Conv2D(128, (5, 5), padding="same",
                      activation="relu"))

    models.add(MaxPooling2D(pool_size=(2, 2)))
    models.add(Conv2D(256, (5, 5), padding="same",
                      activation="relu"))

    models.add(MaxPooling2D(pool_size=(2, 2)))

    # Once the convolutional and pooling
    # operations are done the layer
    # is flattened and fully connected layers
    # are added
    models.add(Flatten())
    models.add(Dense(256, activation="relu"))

    # Finally as there are total 10
    # classes to be added a FCC layer of
    # 10 is created with a softmax activation
    # function
    models.add(Dense(10, activation="softmax"))
    return models

def _apply_df(data):
    model = model_arch()
    model.load_weights("model_mnist.h5")
    return model.predict(data)

def apply_by_multiprocessing(data, workers):

    pool = Pool(processes=workers)
    # result = pool.map(_apply_df, np.array_split(list(data.as_numpy_iterator()), workers))
    result = pool.map(_apply_df, data.batch(np.ceil(len(data) / workers)))
    pool.close()
    return list(result)

def resize_and_rescale(data):
    data = tf.cast(data, tf.float32)
    data /= 255.0
    return data

def prepare(ds):
    ds = ds.map(resize_and_rescale)
    return ds

def after_prepare(data):
    tens_data = tf.data.Dataset.from_tensor_slices(data)
    tens_data = prepare(tens_data)
    return tens_data

def main():
        
    multiprocessing.set_start_method('spawn')
    os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
    
    fashion_mnist = tf.keras.datasets.fashion_mnist
    _, (test_images, test_labels) = fashion_mnist.load_data()

    test_images = after_prepare(test_images)
    results = apply_by_multiprocessing(test_images, workers=3)
    print(test_images)                 # <MapDataset with shape=(28, 28)>
    print(len(results))                # 3
    print([x.shape for x in results])  # [(3334, 10), (3334, 10), (3332, 10)]
    
    results_flatten = list(chain.from_iterable(results))
    print(len(results_flatten), results_flatten[0].shape)  # 10000 (10,)


if __name__ == "__main__":
    main()
H4iku
  • 674
  • 2
  • 6
  • 23
  • Hi, thanks for the help! I tried it and it shows me gpu memory related errors now, I have updated my post. what version of tensorflow do you use? I am using `2.10.0`, `cudatoolkit 11.2.2`, `cudnn 8.1.0.77` – George Nov 08 '22 at 07:19
  • I also saw [this](https://pythonspeed.com/articles/python-multiprocessing/) and I used ` with get_context("spawn").Pool() as pool:` inside `apply_by_multiprocessing`, but still the same issues. – George Nov 08 '22 at 08:22
  • Yes, using `spawn` as `start_method` [helps](https://stackoverflow.com/questions/70496446/parallelizing-keras-model-predict-using-multiprocessing#comment124646047_70500865). I'm using the same latest version of tensorflow. This code is meant to run on CPU; you don't need to install `cudatoolkit` or `cudnn`. For GPU, you don't need any of this multiprocessing stuff. – H4iku Nov 08 '22 at 12:30
  • Yes, right. Just my environment already has these packages. I use GPU for training. But, what is the problem with the errors? Do you happen to know? I used either `spawn` or `forkserver` but with no success. – George Nov 08 '22 at 12:38
  • The problem is that TensorFlow, by default, tries to use GPU when available. You have to set it to use only the CPU here manually. That's why you are getting the `CUDA_ERROR_OUT_OF_MEMORY` error. If you only use the CPU, it shouldn't put anything on the VRAM. Try [this post's](https://stackoverflow.com/questions/37660312/how-to-run-tensorflow-on-cpu) suggested methods. – H4iku Nov 08 '22 at 12:47
  • 1
    Nice! I added `os.environ['CUDA_VISIBLE_DEVICES'] = '-1'` and `multiprocessing.set_start_method('spawn', force=True)` and it runs fine! Thanks! – George Nov 08 '22 at 12:51
  • Hmm.. I am trying this to my code..Basically, I want to run `multiple` predictions! Not one `model.predict` ! I have for example, 62500 arrays and I want to make predictions in each one! Right now, in the `_apply_df`, data, breaks into 4 batches of 15625 each, for 4 workers. So, the `results` has `len 4` instead of `62500` !! Is there a solution for this? – George Nov 08 '22 at 14:34
  • If the only problem is the result's shape, you can easily extract your 62500 samples from the length 4 list. First import `from itertools import chain`, then use `results_flatten = list(chain.from_iterable(results))`. Now `results_flatten` has length 62500. – H4iku Nov 08 '22 at 18:43
  • It gives me length `128` .. And if I try `results_flatten[0].shape`, it gives me `(32,)`, instead of `(32,32)` , (height, width). – George Nov 08 '22 at 19:58
  • The result items shouldn't be `(32, 32)`, they are not image pixels, they are class probabilities. I updated the code in the answer, and it works based on the mnist dataset. The result shape for mnist is `(10,)` since there are 10 classes (digits to identify). It's maybe better to open a new question with your new data. – H4iku Nov 08 '22 at 23:09
  • Ok, so I opened a new question [here](https://stackoverflow.com/questions/74371371/make-multiple-parallel-predictions-on-tensorflow-model). Thanks – George Nov 09 '22 at 07:41
  • Hmm.. I think my solution is right! It works! I am refering to the post I [opened](https://stackoverflow.com/questions/74371371/make-multiple-parallel-predictions-on-tensorflow-model). If I use the `for loop` , it works! It gives right results and faster! I was confused by doing the loop but I think it is the right thing since the loop is in the function that is fed to the `pool.map`. What do you think? – George Nov 10 '22 at 09:03