1

This is yet another question regarding multiprocessing module in Python 3.5. My problem is that I know all the forked processed have done their job (I can see their result in Queue), the AsyncResult.result() returns True which means that jobs are completed but when I proceed with PoolObj.join(), it takes forever. I know I can PoolObj.terminate() and carry on with my life, but I want to know why the heck does this happen?

I'm using following code:

def worker(d):
    queue.put(d)

def gen_data():
    for i in range(int(1e6)):
        yield i

if __name__ == "__main__":
    queue = Queue(maxsize=-1)
    pool = Pool(processes=12)
    pool_obj_worker = pool.map_async(worker, gen_data(), chunksize=1)
    pool.close()

    print ('Lets run the workers...\n')
    while True:
        if pool_obj_worker.ready():
            if pool_obj_worker.successful():
                print ('\nAll processed successfully!') # I can see this quickly, so my jobs are done
            else:
                print ('\nAll processed. Errors encountered!')
            sys.stdout.flush()
            print (q.qsize()) # The size is right that means all workers have done their job
            pool.join() # will get stuck here for long long time
            queue.put('*')           
            break
    print ('%d still to be processed' %
           pool_obj_worker._number_left)
    sys.stdout.flush()
    time.sleep(0.5)

Am I doing it wrong? Please enlighten me. Or are the processes holding join() have gone zombie?

Parashar
  • 143
  • 1
  • 12
  • What happens if you reduce the size of what you're trying to put into the queue? – roganjosh Aug 29 '16 at 11:43
  • The issue exists even for smaller values of 1e5 and 1e4. However, for values smaller than these, the issue is not so obvious. – Parashar Aug 29 '16 at 11:51
  • Reduce it to `10` and try - does it at least complete or is it still hanging? This could be related to [this](https://bugs.python.org/issue8426) and I had a similar problem [here](http://stackoverflow.com/questions/38961584/multiprocesses-become-zombie-processes-when-increasing-iterations-what-is-the-a) but using `Process`. Although, in your case, you're saying the queue is the correct size so there might be something else happening, but it might set you in the right direction. Also, what happens without `.join()` at all? – roganjosh Aug 29 '16 at 11:56
  • Even with 10, join stalls for like 10 secs. I'm using 12 processes and can see them in process monitor even after receiving `True` from `pool_obj_worker.successful()` – Parashar Aug 29 '16 at 12:10
  • If I use `collections.deque` in place of `multiprocesssing.Queue`, the issue resolves but its of no use as I cannot get my results back. I suppose with every fork a new instance of deque is initialised.. – Parashar Aug 29 '16 at 12:12

1 Answers1

2

The issue here is that you are using an extra Queue in your worker, other than the one fournished by Pool. When the processes finish their work, they will all join the FeederThread used in the multiprocessing.Queue and these calls will hang (probably because all the threads call join simultaneously and there can be some weird race conditions, it is not easy to investigate).

Adding multiprocessing.util.log_to_stderr(10) permits to reveal that your processes hang while joining the queue feeder thread.

To solve your issue, you can either use multiprocessing.SimpleQueue instead of multiprocessing.Queue (no hang in join as there is no feeder thread) or try using the method pool.unordered_imap which provides the same kind of behavior as what you seem to implement (give back an unordered generator containing the results returned by worker).

Thomas Moreau
  • 4,377
  • 1
  • 20
  • 32
  • Thanks Thomas. You are right! joining Queues has got huge overhead. How can I monitor the progress of job if use pool.imap_unordered? – Parashar Aug 29 '16 at 12:50
  • An additional related question here! Why do iterations over the imap generator tend to get slower over time. Is it the affect of accumulating results in a list? If so how can I solve this? – Parashar Aug 29 '16 at 13:01
  • To monitor the progress, you can try accessing `IMapUnorderedIterator._index` – Thomas Moreau Aug 29 '16 at 13:07
  • I don't see any slowdown with the code provided here. Each time you access an element of the generator, it is removed from the list so if it is balanced, it should not really slowdown. Maybe you have bigger object than `int` and the generator grow to big. This depends on your usage.. – Thomas Moreau Aug 29 '16 at 13:18