I think your choice to use multiprocessing.Pool
alongside your own queue
is the source of the main problems you're having. Using a pool creates the child processes up front, which jobs are later assigned to. But since you can't (easily) pass a queue
to an already existing process, that's not a good match for your problem.
Instead, you should either get rid of your own queue and use the queue that's built into the pool to get a value return
ed by worker
or scrap the pool completely and use multiprocessing.Process
to start a new process for each task you have to do.
I'd also note that your code has a race condition in the main processes between the main thread that modifies the x
array and the thread that serializes the old value before it's sent to a worker process. Much of the time you'll probably end up sending many copies of the same array (with the final value) instead of the several different values you intend.
Here's a quick and untested version that drops the queue:
def worker(arr):
time.sleep(0.2)
return arr[0]
if __name__ == "__main__":
p = mp.Pool(4)
results = p.map(worker, [np.array([i, 4]) for i in range(4)])
p.join()
for result in results:
print(result)
And here's a version that drops the Pool
and keeps the queue:
def worker(q, arr):
time.sleep(0.2)
q.put(arr[0])
if __name__ == "__main__":
q = m.Queue()
processes = []
for i in range(4):
p = mp.Process(target=worker, args=(q, np.array([i, 4])))
p.start()
processes.append(p)
for i in range(4):
print(q.get())
for p in processes:
p.join()
Note that in the last version it may be important that we get
the results from the queue before we try to join
the processes (though probably not if we're only dealing with four values). If the queue were to fill up, a deadlock could occur if we did it in the other order. The worker might be blocked trying to write to the queue, while the main process is blocked waiting for the worker process to exit.