Context
A Keras model (link here, for the sake of MWE) needs to predict a lot of test data, in parallel.
I define a cube as a 3D numpy.ndarray
of uint
. Each of its vertical slices is a column, which is npixels = 128 height, nbins = 128 depth.
Each prediction transforms a column in a denoised column (same size).
I'm providing three approaches: a single-threaded, a multiprocessing and a pathos
package multiprocessing. Both the multi-threaded approaches are not working, and I don't get the reason.
Code
import keras
import numpy as np
import threading
import pathos.multiprocessing
import multiprocessing
def __res_sum_squares(y_true, y_pred):
squared_diff = (y_true - y_pred) ** 2
return keras.backend.sum(squared_diff)
__npixels, __nbins = 128, 128
__shape_col = (__npixels, __nbins)
__shape_nn = (1, __npixels, __nbins, 1)
__model = keras.models.load_model('./model.h5', compile=True, custom_objects={'res_sum_squares': __res_sum_squares})
__max_parallel_predictions = 4
__sema = threading.BoundedSemaphore(value=__max_parallel_predictions)
def __mt_pathos_manager(col_ratio):
return __denoise(col_ratio[0], col_ratio[1])
def __denoise_frame_mt_pathos(frame_ratios):
results = pathos.multiprocessing.ProcessingPool().map(__mt_pathos_manager, frame_ratios)
return results
def __denoise_frame_mt_multiprocessing(frame_ratios):
pool = multiprocessing.Pool()
results = pool.map(__denoise, map(lambda col_ratio: col_ratio, frame_ratios))
pool.close()
pool.join()
return results
def __denoise(col, ratio=None):
"""
:param col: the source column
:param ratio: logging purposes
:return: the denoised column
"""
really_predict = True
if type(col) is tuple:
col, ratio = col[0], col[1]
col_denoise = np.reshape(col, __shape_nn)
print("{} acquiring".format(ratio))
__sema.acquire()
print("{} acquired".format(ratio))
# ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ CRITICAL SECTION START ~ ~ ~ ~ ~ ~ ~ ~ ~ ~
col_denoise = __model.predict(col_denoise) if really_predict else col_denoise
# ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ CRITICAL SECTION END ~ ~ ~ ~ ~ ~ ~ ~ ~ ~
print("{} releasing".format(ratio))
__sema.release()
print("{} released".format(ratio))
return np.reshape(col_denoise, __shape_col)
def denoise_cube(cube, mp=False, mp_pathos=False):
"""
:param cube: a numpy 3D array of ncols * npixels * nbins
:param mp: use multiprocessing
:param mp_pathos: use pathos multiprocessing
:return: the denoised cube
"""
ncol = cube.shape[0]
ratios = [(ic * 100.0) / ncol for ic in range(0, ncol)]
frame_ratios = zip(cube, ratios)
if mp:
if mp_pathos:
l_cols_denoised = __denoise_frame_mt_pathos(frame_ratios)
else:
l_cols_denoised = __denoise_frame_mt_multiprocessing(frame_ratios)
else:
l_cols_denoised = [__denoise(col, ratio) for col, ratio in frame_ratios]
return l_cols_denoised
if __name__ == "__main__":
test_cube = np.random.rand(1000, __npixels, __nbins)
# Single threaded impl: works fine
denoise_cube(test_cube, mp=False)
# Multiprocessing Pool: blocks at the eighth "acquired" print
denoise_cube(test_cube, mp=True, mp_pathos=False)
# Pathos multiprocessing Pool: blocks at the eighth "acquired" print
denoise_cube(test_cube, mp=True, mp_pathos=True)
Analysis
First thing I guessed was that somehow the rush to __model.predict()
was blocking after 8 calls (= number of cpu cores on test machine).
So I placed a threading.BoundedSemaphore
with less than 8 accesses. Nothing works.
Single-threaded works as expected:
0.0 acquiring
0.0 acquired
0.0 releasing
0.0 released
< ............ >
99.9 acquiring
99.9 acquired
99.9 releasing
99.9 released
Multiprocessing (both versions) don't.
0.0 acquiring
0.0 acquired
3.2 acquiring
3.2 acquired
6.4 acquiring
6.4 acquired
9.6 acquiring
9.6 acquired
12.8 acquiring
12.8 acquired
16.0 acquiring
16.0 acquired
19.2 acquiring
19.2 acquired
22.4 acquiring
22.4 acquired
< hangs >
Wait, where are the release
prints? Seems that semaphore is not touched, or is being copied for each call, and always reinitialized. Hmm.
So let's look for really_predict = True
and swap its value: predict()
call will never be reached in this way.
.... And this works well, great!!! So problem is not entirely addressable to multiprocessing
, instead to a strange link between keras
prediction and multiprocessing
pooling. Any advice ?