0

This above code works fine but I don't fully understand it:

from multiprocessing import Pool,Manager
from itertools import chain

def calculate_primes():
    ncore = 2
    N = 50
    with Manager() as manager:
        input = manager.Queue()
        output = manager.Queue()

        with Pool(ncore) as p:
            it = p.starmap_async(find_prime,[(input,output)])
            for r in chunks(range(1,N),10): # chunks the list into sublists
                input.put(r)
            input.put(None)
            it.wait() # if here: doesn't work! Why?
            output.put(None)
            it.wait() # mut be here to works fine!!

        res = list(chain(*list(iter(output.get,None))))
    return res

find_prime() is a function that returns a prime numbers list.

When p.starmap_async(somefunction,[(input,output)]) instruction is really executed? Why does this function need it.wait() instruction to be executed?

If no it.wait() instruction in function => it returns empty list [] If it.wait() instruction located before inut.put(None) => it seemns an infinite loop that I must kill.

Why? Thanks a lot.

Theo75
  • 477
  • 4
  • 14
  • 1
    Can you describe at last the signature of `find_prime`?. This piece of code runs only one process at a time, and clearly do not profits of the `map` part of `starmap_async`, which removes the burden of setting up queues. – azelcer Jan 11 '22 at 16:42
  • def find_prime_worker(input, output): for chunk in iter(input.get,None): primes_found = list(filter(check_prime,chunk)) output.put(primes_found) – Theo75 Jan 11 '22 at 20:25

1 Answers1

2

The idea of map functions is to remove the burden of setting up queues from the programmer, reading from the queues and assemble the responses in order. Your code could be simplified to:

def find_prime(chunk):
    primes_found = list(filter(check_prime, chunk))
    return primes_found

def calculate_primes():
    ncore = 2
    N = 50
    with Pool(ncore) as p:
        res = p.map(find_prime, chunks(range(1,N), 10))
    return res

(where chunks does what I guess it does)

The original code has the following issues:

  • When you call p.starmap_async(find_prime,[(input,output)]) you are passing a lenght-1 iterable, so only one process will be used. You can use [(input,output)] * ncore instead to use ncore processes.
  • If you do use more than one process, only one of them would get the None value from the queue, and the other one would wait forever. This bug was hidden by the previous one.
  • it.wait() waits forever for the process to finish. If you put it before input.put(None), the spawned process is waiting for a None value to finish and the main process is waiting for the spawned process to finish: there is a deadlock (note that the question says one thing "t.wait() instruction located before input.put(None)" but the code says another thing).
  • As you are using the async version, you must wait for the process to finish. If you don't call wait, you put a None in the output queue immediately, before any result has been fed by the spawned processes. Thus, the first value removed from the queue is None, signaling the end of the iterations for the iter call: you end up with an empty list. Moreover, you exit the with Pool(ncore) as p: while the processes are still running and (I guess) they are terminated.

Passing chunks of data to process instead of one by one is a good idea: that is what the chunksize parameter of the map functions is for. Nevertheless, it would not be useful with your current implementation of find_prime

azelcer
  • 1,383
  • 1
  • 3
  • 7