1

I want to use a queue to hold result because I want a consumer (serial not parallel) to process the result of the workers as the workers produce the result.

For now, I want to know why the following program hangs.

import multiprocessing as mp
import time
import numpy as np
def worker(arg):
    time.sleep(0.2)
    q, arr = arg 
    q.put(arr[0])

p = mp.Pool(4)
x = np.array([4,4])
q = mp.Queue()

for i in range(4):
    x[0] = i 
    #worker((q,x))
    p.apply_async(worker, args=((q, x),)) 

print("done_apply")
time.sleep(0.2)
for i in range(4):
    print(q.get())
hamster on wheels
  • 2,771
  • 17
  • 50
  • I'm not sure I understand what you're asking. Is the code you show not working because of some kind of deadlock, or does it work fine and you're trying to harden it against some potential future issue? – Blckknght Jul 25 '17 at 19:46
  • it hangs. i found a solution, but it uses manager, and needs to copy the input. Sorry for typo in question: 'when' -> why. – hamster on wheels Jul 25 '17 at 19:48

3 Answers3

1

Change apply_async to apply gives error message:

"Queue objects should only be shared between processes through inheritance"

A solution:

import multiprocessing as mp
import time
import numpy as np
def worker(arg):
    time.sleep(0.2)
    q, arr = arg
    q.put(arr[0])

p = mp.Pool(4)
x = np.array([4,4])
m = mp.Manager()
q = m.Queue()

for i in range(4):
    x[0] = i
    #worker((q,x))
    p.apply_async(worker, args=((q, x),))

print("done_apply")
time.sleep(0.2)
for i in range(4):
    print(q.get())

Result:

done_apply
3
3
3
3

Apparently, I need to manually make copies of the numpy array because the desired result should be 0, 1, 2, 3 in any order instead of 3, 3, 3, 3.

hamster on wheels
  • 2,771
  • 17
  • 50
1

Queue objects cannot be shared. I came to the same conclusion as the OP first by finding this answer.

Unfortunately, there were other problems in this code (which doesn't make it an exact duplicate of the linked answer)

  • worker(arg) should be worker(*arg) for the args unpacking to work. Without that, my process locked up too (I admit I don't know why. It should have thrown an exception, but I guess that multiprocessing & exceptions don't work well together)
  • passing the same x to the workers result in same number as a result (with apply it works, but not with apply_async

Another thing: for the code to be portable, wrap the main code by if __name__ == "__main__":, required on Windows because of differences in process spawning

Fully fixed code that outputs 0,3,2,1 for me:

import multiprocessing as mp
import time
import numpy as np
def worker(*arg):  # there are 2 arguments to "worker"
#def worker(q, arr):  # is probably even better
    time.sleep(0.2)
    q, arr = arg
    q.put(arr[0])

if __name__ == "__main__":
    p = mp.Pool(4)

    m = mp.Manager()  # use a manager, Queue objects cannot be shared
    q = m.Queue()

    for i in range(4):
        x = np.array([4,4])  # create array each time (or make a copy)
        x[0] = i
        p.apply_async(worker, args=(q, x))

    print("done_apply")
    time.sleep(0.2)
    for i in range(4):
        print(q.get())
Jean-François Fabre
  • 137,073
  • 23
  • 153
  • 219
  • yes, for all other shared objects too (lists, dicts) that you want to modify. – Jean-François Fabre Jul 25 '17 at 19:59
  • Each worker returns a result and the queue holds it. For each result, the result is modified by none of the process except for the process that created the result. So.. that means the manager is not necessary? – hamster on wheels Jul 25 '17 at 20:22
  • 1
    https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes: "when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes". In your example, it locks up without the manager. So I guess it's necessary, and the docs advise you do to it, so I wouldn't bypass this advice... the problem isn't with the result but with the sharing of the `Queue` object. – Jean-François Fabre Jul 25 '17 at 20:29
  • the logging example for multiprocessing uses a Queue without manager: https://docs.python.org/3/howto/logging-cookbook.html – hamster on wheels Jul 27 '17 at 08:59
  • See new question: https://stackoverflow.com/questions/45346814/multiprocessing-queue-need-manager – hamster on wheels Jul 27 '17 at 09:40
1

I think your choice to use multiprocessing.Pool alongside your own queue is the source of the main problems you're having. Using a pool creates the child processes up front, which jobs are later assigned to. But since you can't (easily) pass a queue to an already existing process, that's not a good match for your problem.

Instead, you should either get rid of your own queue and use the queue that's built into the pool to get a value returned by worker or scrap the pool completely and use multiprocessing.Process to start a new process for each task you have to do.

I'd also note that your code has a race condition in the main processes between the main thread that modifies the x array and the thread that serializes the old value before it's sent to a worker process. Much of the time you'll probably end up sending many copies of the same array (with the final value) instead of the several different values you intend.

Here's a quick and untested version that drops the queue:

def worker(arr):
    time.sleep(0.2)
    return arr[0]

if __name__ == "__main__":
    p = mp.Pool(4)
    results = p.map(worker, [np.array([i, 4]) for i in range(4)])
    p.join()
    for result in results:
        print(result)

And here's a version that drops the Pool and keeps the queue:

def worker(q, arr): 
    time.sleep(0.2)
    q.put(arr[0])

if __name__ == "__main__":
    q = m.Queue()
    processes = []

    for i in range(4):
        p = mp.Process(target=worker, args=(q, np.array([i, 4])))
        p.start()
        processes.append(p)

    for i in range(4):
        print(q.get())

    for p in processes:
        p.join()

Note that in the last version it may be important that we get the results from the queue before we try to join the processes (though probably not if we're only dealing with four values). If the queue were to fill up, a deadlock could occur if we did it in the other order. The worker might be blocked trying to write to the queue, while the main process is blocked waiting for the worker process to exit.

Blckknght
  • 100,903
  • 11
  • 120
  • 169
  • hmm. I am using pool and queue instead of process right now. If there are 10 tasks and 4 processes (1 for each cpu core), using pool would only create 4 processes. Using Process would need to create 10 processes. Is there anyway to reuse Process? – hamster on wheels Jul 26 '17 at 02:07
  • The queue is for passing results between the producers in the worker processes and the consumer in the main process. In every 30 cycles, the program reads from hard disk and starts 120 asynchronous calculation tasks. After that, the program runs the consumer, which fetches 120 results from the queue as soon as one of the worker put result into the queue. After the consumer is done, this whole thing repeats. Then at the end, the program calls pool.close, pool.join, and then the consumer to process all remaining results. The workers tasks use much more time (30x) than the consumer + hard-disk. – hamster on wheels Jul 26 '17 at 02:17
  • Are there any simple way or a "queue that's built into the pool" that allows the consumer fetches result from the workers when the workers are still working on tasks? So far, I am manually counting 120 tasks before starting the consumer. There should be an easier way... I probably should ask a new question. – hamster on wheels Jul 26 '17 at 02:38
  • I'm not entirely sure I understand what you're asking, but if you mean how can the main process consume some of the results from the pool, you can use `pool.imap` (or `imap_unordered`) to get an iterator over the results that runs in parallel with further computations. I suggest [reading the docs](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.imap). – Blckknght Jul 26 '17 at 02:51
  • Here is the new question: https://codereview.stackexchange.com/questions/171172/load-balancing-consumer-and-producer-with-multiprocessing – hamster on wheels Jul 26 '17 at 03:32