3

I have a lot of PNG images that I want to classify, using a trained CNN model.

To speed up the process, I would like to use multiple-processing with CPUs (I have 72 available, here I'm just using 4). I don't have a GPU available at the moment, but if necessary, I could get one.

My workflow:

  1. read a figure with openCV

  2. adapt shape and format

  3. use mymodel.predict(img) to get the probability for each class

When it comes to the prediction step, it never finishes the mymodel.predict(img) step. When I use the code without the multiprocessing module, it works fine. For the model, I'm using keras with a tensorflow backend.

# load model
mymodel = load_model('190704_1_fcs_plotclassifier.h5')

# use python library multiprocessing to use different CPUs
import multiprocessing as mp

pool = mp.Pool(4)

# Define callback function to collect the output in 'outcomes'
outcomes = []

def collect_result(result):
    global outcomes
    outcomes.append(result)

# Define prediction function

def prediction(img):
    img = cv2.resize(img,(49,49))
    img = img.astype('float32') / 255
    img = np.reshape(img,[1,49,49,3])       

    status = mymodel.predict(img)
    status = status[0][1]

    return(status)

# Define evaluate function

def evaluate(i,figure):

    # predict the propability of the picture to be in class 0 or 1
    img = cv2.imread(figure)
    status = prediction(img)

    outcome = [figure, status]
    return(i,outcome)

# execute multiprocessing
for i, item in enumerate(listoffigurepaths):
        pool.apply_async(evaluate, args=(i, item), callback=collect_result)
pool.close()
pool.join()

# get outcome
print(outcomes)

It would be great if somebody knows how to predict several images at a time!

I simplified my code here, but if somebody has an example how it could be done, I would highly appreciate it.

user3666197
  • 1
  • 6
  • 50
  • 92
Julia
  • 41
  • 1
  • 4

3 Answers3

1

Does a processing-speed
or a size-of-RAM
or a number-of-CPU-cores
or an introduced add-on processing latency matter most?
ALL OF THESE DO:

The python multiprocessing module is known ( and the joblib does the same ) to:

The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

Yet, as everything in our Universe, this comes at cost:

The wish, expressed by O/P as:

To speed up the process, I would like to use multiple-processing with CPUs (I have 72 available

will, for this kind of a similar application of a pre-trained mymodel.predict()-or, if sent into a Pool( 72 )-execution almost for sure suffocate almost any hardware RAM by swapping.

Here is an example, where "just"-Do-Nothing worker was spawned by the n_jobs = 100 directive - to see what happens ( time-wise ~ 532+ [ms] lost + memory-allocation-wise where XYZ [GB] or RAM have immediately been allocated by O/S ):

enter image description here

This comes from the fact, that each multiprocessing spawned sub-process ( not threads, as O/P has already experienced on her own ) is first instantiated ( after an adequate add-on latency due to O/S process/RAM-allocations-management ) as a ---FULL-COPY--- of the ecosystem present inside the original python process ( the complete python interpreter + all its import-ed modules + all its internal state and data-structures - used or not - ) so indeed huge amounts of RAM-allocations take place ( have you noticed the platform started to SWAP? notice how many sub-processes were spawned until that time and you have a ceiling of how many such can fit in-RAM and it makes devastating performance effects if trying ( or letting, by using the joblib-s n_jobs = -1 auto-scaling directive ) to populate more sub-processes, that this SWAP-introducing number...

So far good, we have paid some ( often for carefully designed code a reasonably negligible amount, if compared to fully train again the whole predictor, doesn't it? ) time to spawn some number of parallel processes.

If the distributed workload next goes back, to one, common and performance-wise singular resource ( a disk directory-tree with files ), the performance of parallel-processes goes but in wreck havoc - it has to wait for such resource(!) to first get it free again.

Finally, even the "right"-amount of Pool()-spawned sub-processes, such that prevents am O/S to start SWAPPING RAM to disk and back, the inter-process communication is extremely expensive -- here, serialising ( Pickling/unPickling ) + enQueueing + deQueueing all DATA-objects, one has to pass there and back ( yes, even for the callback fun ), so the less one sends, the way faster the Pool-processing will become.

Here, all Pool-associated processes might benefit from independent logging of the results, which may reduce both the scales and latency of the inter-process communications, but will also consolidate the results, reported by any number of workers into the common log.


How to ... ? First benchmark the costs of each step:

Without hard facts ( measured durations in [us] ), one remains with just an opinion.

def prediction( img ):
    img = cv2.resize( img, ( 49, 49 ) ) 
    img = img.astype( 'float32' ) / 255
    img = np.reshape( img, [1, 49, 49, 3] )       

    status = mymodel.predict( img )
    status = status[0][1]

    return( status )

def evaluate( i, figure ):  # predict the propability of the picture to be in class 0 or 1
    img = cv2.imread( figure )
    status = prediction( img )

    outcome = [figure, status]

    return( i, outcome )
#--------------------------------------------------
from zmq import Stopwatch
aClk = Stopwatch()
#------------------------------------NOW THE COSTS OF ORIGINAL VERSION:
aListOfRESULTs = []
for iii in range( 100 ):
    #-------------------------------------------------aClk-ed---------- SECTION
    aClk.start(); _ = evaluate( 1, aFigureNAME ); A = aClk.stop()
    #-------------------------------------------------aClk-ed---------- SECTION
    print( "as-is took {0:}[us]".format( A ) );aListOfRESULTs.append( A )

#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------

Lets try something a bit else:

def eval_w_RAM_allocs_avoided( indexI, aFigureNAME ):
    return [ indexI,
             [ aFigureNAME,
               mymodel.predict( ( cv2.resize( cv2.imread( aFigureNAME ),
                                              ( 49, 49 )
                                              ).astype( 'float32' ) / 255
                                  ).reshape( [1, 49, 49, 3]
                                             )
                                )[0][1],
               ],
             ]

#------------------------------------NOW THE COSTS OF MOD-ed VERSION:
aListOfRESULTs = []
for iii in range( 100 ):
    #-------------------------------------------------aClk-ed---------- SECTION
    aClk.start()
    _ = eval_w_RAM_allocs_avoided( 1, aFigureNAME )
    B = aClk.stop()
    #-------------------------------------------------aClk-ed---------- SECTION
    print( "MOD-ed took {0:}[us] ~ {1:} x".format( B, float( B ) / A ) )
    aListOfRESULTs.append( B )
#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------

And the actual img pre-processing pipeline overhead costs:

#------------------------------------NOW THE COSTS OF THE IMG-PREPROCESSING
aListOfRESULTs = []
for iii in range( 100 ):
    #-------------------------------------------------aClk-ed---------- SECTION
    aClk.start()
    aPredictorSpecificFormatIMAGE = ( cv2.resize( cv2.imread( aFigureNAME ),
                                                  ( 49, 49 )
                                                  ).astype( 'float32' ) / 255
                                      ).reshape( [1, 49, 49, 3]
                                                 )
    C = aClk.stop()
    #-------------------------------------------------aClk-ed---------- SECTION
    print( "IMG setup took {0:}[us] ~ {1:} of A".format( C, float( C ) / A ) )
    aListOfRESULTs.append( C )

#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------

Actual fileI/O ops:

#------------------------------------NOW THE COSTS OF THE IMG-FILE-I/O-READ
aListOfRESULTs = []
for iii in range( 100 ):
    #-------------------------------------------------aClk-ed---------- SECTION
    aFileNAME = listoffigurepaths[158 + iii * 172]
    aClk.start()
    _ = cv2.imread( aFileNAME )
    F = aClk.stop()
    #-------------------------------------------------aClk-ed---------- SECTION
    print( "aFileIO took {0:}[us] ~ {1:} of A".format( F, float( F ) / A ) )
    aListOfRESULTs.append( F )

#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------

Without these hard-fact collected ( as a form of quantitative records-of-evidence ), one could hardly decide, what would be the best performance boosting step here for any massive-scale prediction-pipeline image processing.

Having these items tested, post results and further steps ( be it for going via multiprocessing.Pool or using other strategy for larger performance scaling, to whatever higher performance ) may first get reasonably evaluated, as the hard facts were first collected to do so.

user3666197
  • 1
  • 6
  • 50
  • 92
0

One python package I know that may help you is joblib. Hope it can solve your problem.

from joblib import Parallel, delayed
# load model
mymodel = load_model('190704_1_fcs_plotclassifier.h5')

# Define callback function to collect the output in 'outcomes'
outcomes = []

def collect_result(result):
    global outcomes
    outcomes.append(result)

# Define prediction function

def prediction(img):
    img = cv2.resize(img,(49,49))
    img = img.astype('float32') / 255
    img = np.reshape(img,[1,49,49,3])       

    status = mymodel.predict(img)
    status = status[0][1]

    return(status)

# Define evaluate function

def evaluate(i,figure):

    # predict the propability of the picture to be in class 0 or 1
    img = cv2.imread(figure)
    status = prediction(img)

    outcome = [figure, status]
    return(i,outcome)

outcomes = Parallel(n_jobs=72)(delayed(evaluate)(i,figure) for figure in listoffigurepaths)
guorui
  • 871
  • 2
  • 9
  • 21
  • Thanks for your answer. It helped me to progress a bit: I had to add this to my model prediction: `mymodel = load_model('190704_1_fcs_plotclassifier.h5'); mymodel._make_predict_function(); graph = tf.get_default_graph()` this to my prediction function: `with graph.as_default(): status = mymodel.predict(img)` And then it only worked with the use of threads: `outcomes = Parallel(n_jobs=10, prefer="threads")(delayed(evaluate)(figure) for figure in listoffigurepaths)` – Julia Jul 06 '19 at 15:42
  • With all due respect, the **proposed `joblib`** uses the very same package - the **`multiprocessing`** module as one of its possible backends, so no, this will not solve the O/P indicated performance problems ( neither does the threading, as it leaves any-number-of-python-threads inside a common jail of the performance blocking of the **global GIL-lock-trap.** – user3666197 Jul 06 '19 at 17:15
0

img_height = 512 # Height of the input images

img_width =512 # Width of the input images

img_channels = 3 # Number of color channels of the input images

orig_images = [] # Store the images here.

batch_holder = np.zeros((20, img_height, img_width, 3))

img_dir = "path/to/image/"

for i,img, in enumerate(os.listdir(img_dir)):

img = os.path.join(img_dir, img)

orig_images.append(imread(img))

img =image.load_img(img, target_size=(img_height, img_width))

batch_holder[i,:] =img

y_pred = model.predict(batch_holder)

y_pred_decoded = decode_y(y_pred,parameter)

np.set_printoptions(precision=2, suppress=True, linewidth=90)

print("Predicted boxes:\n")

print(' class conf xmin ymin xmax ymax')

print(y_pred_decoded[i])

Display the image and draw the predicted boxes onto it.

Set the colors for the bounding boxes

colors = plt.cm.hsv(np.linspace(0, 1, 21)).tolist()

classes = ['background','class']

current_axis = plt.gca()

for box in y_pred_decoded[I]:

xmin = box[-4] * orig_images[0].shape[1] / img_width

ymin = box[-3] * orig_images[0].shape[0] / img_height

xmax = box[-2] * orig_images[0].shape[1] / img_width

ymax = box[-1] * orig_images[0].shape[0] / img_height

color = colors[int(box[0])]

label = '{}: {:.2f}'.format(classes[int(box[0])], box[1])

current_axis.add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,

color=color, fill=False, linewidth=2))

current_axis.text(xmin, ymin, label, size='x-large', color='white',

bbox={'facecolor': color, 'alpha': 1.0})

plt.imshow(orig_images[i])

plt.show()

rupa
  • 1
  • 2
    As it’s currently written, your answer is unclear. Please [edit] to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Dec 06 '21 at 09:58