1

I am trying parallel processing in Python using Multiprocessing and Pool. The code to process in parallel is given below.

 all_rects = [[[] for x in range(net_config["grid_width"])] for y in range(net_config["grid_height"])]
 for n in range(len(bbox_list)):
     for k in range(net_config["grid_height"] * net_config["grid_width"]):
         conf = conf_list[n][k,1].flatten()[0]
         if conf > 0.75:
             y = int(k / net_config["grid_width"])
             x = int(k % net_config["grid_width"])
             bbox = bbox_list[n][k]

             abs_cx = pix_per_w/2 + pix_per_w*x + int(bbox[0,0,0])
             abs_cy = pix_per_h/2 + pix_per_h*y+int(bbox[1,0,0])
             w = bbox[2,0,0]
             h = bbox[3,0,0]
             all_rects[y][x].append(Rect(abs_cx,abs_cy,w,h,conf))

len(bbox_list) is varying for each iteration. I like to set number of processes according to the number of len(bbox_list).

So I have a function

def doWork(box_list, conf_list, all_rects, w, h, n, pix_per_w):
    for k in range(h * w):
        conf = conf_list[n][k,1].flatten()[0]
        if conf > 0.75:
           y = int(k / w)
           x = int(k % w)
           bbox = bbox_list[n][k]
           abs_cx = pix_per_w/2 + pix_per_w*x + int(bbox[0,0,0])
           abs_cy = pix_per_h/2 + pix_per_h*y+int(bbox[1,0,0])
           w = bbox[2,0,0]
           h = bbox[3,0,0]
           all_rects[y][x].append(Rect(abs_cx,abs_cy,w,h,conf))

Then from main, I do parallel processing as

all_rects = [[[] for x in range(net_config["grid_width"])] for y in range(net_config["grid_height"])]
pool = multiprocessing.Pool(len(bbox_list))
pool.map(doWork, (bbox_list, conf_list, all_rects, net_config["grid_width"], net_config["grid_height"], [len(bbox_list)], pix_per_w))

I have one error (TypeError: doWork() takes exactly 7 arguments (1 given)) and another concern is for all_rects. all_rects will be three dimensional array and how is data synchronization for all_rects, is it safe for multiprocessing?

EDIT:

def doWork(worker_args, b_range):
    box_list = worker_args[0]
    conf_list = worker_args[1]
    all_rects = worker_args[2]
    w = worker_args[3]
    h = worker_args[4]    
    pix_per_w = worker_args[5]
    n = b_range
    for k in range(h * w):
        conf = conf_list[n][k,1].flatten()[0]
        if conf > 0.75:
           y = int(k / w)
           x = int(k % w)
           bbox = bbox_list[n][k]
           abs_cx = pix_per_w/2 + pix_per_w*x + int(bbox[0,0,0])
           abs_cy = pix_per_h/2 + pix_per_h*y+int(bbox[1,0,0])
           w = bbox[2,0,0]
           h = bbox[3,0,0]
           all_rects[y][x].append(Rect(abs_cx,abs_cy,w,h,conf))

Then from main,

         all_rects = [[[] for x in range(net_config["grid_width"])] for y in range(net_config["grid_height"])]
         pool = multiprocessing.Pool(len(bbox_list))
         box_ranges = range(len(bbox_list))
         worker_args = [bbox_list, conf_list, all_rects, net_config["grid_width"], net_config["grid_height"], pix_per_w]
         pool.map(doWork, worker_args, l) for l in box_ranges

I tried to send by tuple and I got error as invalid syntax

pool.map(doWork, worker_args, l) for l in box_ranges

Thanks

Selcuk
  • 57,004
  • 12
  • 102
  • 110
batuman
  • 7,066
  • 26
  • 107
  • 229

1 Answers1

1

You can write your worker function to accept a tuple as parameter and expand the tuple in the function, for example:

def doWork(worker_args):
  bbox_list, conf_list, all_rects, w, h, n, pix_per_w = worker_args

For your concern of all_rects, as far as I know, append is thread safe. see Are lists thread-safe

EDIT:

Pool.map is very similar to the regular map function. It receives two arguments, first is your worker function's name, second should be a iterator of the parameters being passed to your worker. So your second parameter should be a list of tuple. Each tuple will be passed to one process as parameter of doWork function:

pool.map(doWork, [(bbox_list, conf_list, all_rects, w, h, n, pix_per_w) for _ in xrange(len(bbox_list))])
Community
  • 1
  • 1
user1537085
  • 404
  • 5
  • 18