3

I have an issue in Python 3.7.3 where my multiprocessing operation (using Queue, Pool, and apply_async) deadlocks when handling large computational tasks.

For small computations, this multiprocessing task works just fine. However, when dealing with larger processes, the multiprocessing task stops, or deadlocks, altogether without exiting the process! I read that this will happen if you "grow your queue without bounds, and you are joining up to a subprocess that is waiting for room in the queue [...] your main process is stalled waiting for that one to complete, and it never will." (Process.join() and queue don't work with large numbers)

I am having trouble converting this concept into code. I would greatly appreciate guidance on refactoring the code I have written below:

import multiprocessing as mp

def listener(q, d):  # task to queue information into a manager dictionary
    while True:
        item_to_write = q.get()
        if item_to_write == 'kill':
            break
        foo = d['region']
        foo.add(item_to_write) 
        d['region'] = foo  # add items and set to manager dictionary


def main():
    manager = mp.Manager()
    q = manager.Queue()
    d = manager.dict()
    d['region'] = set()

    pool = mp.Pool(mp.cpu_count() + 2)
    watcher = pool.apply_async(listener, (q, d))
    jobs = []
    for i in range(24):
        job = pool.apply_async(execute_search, (q, d))  # task for multiprocessing
        jobs.append(job)
    for job in jobs:
        job.get()  # begin multiprocessing task
    q.put('kill')  # kill multiprocessing task (view listener function)
    pool.close()
    pool.join()

    print('process complete')


if __name__ == '__main__':
    main()

Ultimately, I would like to prevent deadlocking altogether to facilitate a multiprocessing task that could operate indefinitely until completion.


BELOW IS THE TRACEBACK WHEN EXITING DEADLOCK IN BASH

^CTraceback (most recent call last):
  File "multithread_search_cl_gamma.py", line 260, in <module>
    main(GEOTAG)
  File "multithread_search_cl_gamma.py", line 248, in main
    job.get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 651, in get
Process ForkPoolWorker-28:
Process ForkPoolWorker-31:
Process ForkPoolWorker-30:
Process ForkPoolWorker-27:
Process ForkPoolWorker-29:
Process ForkPoolWorker-26:
    self.wait(timeout)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 648, in wait
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
     self._event.wait(timeout)
  File "/Users/Ira/anaconda3/lib/python3.7/threading.py", line 552, in wait
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 352, in get
    res = self._reader.recv_bytes()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
    signaled = self._cond.wait(timeout)
  File "/Users/Ira/anaconda3/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()
KeyboardInterrupt
   with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt

Below is the updated script:

import multiprocessing as mp
import queue

def listener(q, d, stop_event):
    while not stop_event.is_set():
        try:
            while True:
                item_to_write = q.get(False)
                if item_to_write == 'kill':
                    break
                foo = d['region']
                foo.add(item_to_write)
                d['region'] = foo
        except queue.Empty:
            pass

        time.sleep(0.5)
        if not q.empty():
            continue


def main():
    manager = mp.Manager()
    stop_event = manager.Event()
    q = manager.Queue()
    d = manager.dict()
    d['region'] = set()
    pool = mp.get_context("spawn").Pool(mp.cpu_count() + 2)
    watcher = pool.apply_async(listener, (q, d, stop_event))
    stop_event.set()
    jobs = []
    for i in range(24):
        job = pool.apply_async(execute_search, (q, d))
        jobs.append(job)
    for job in jobs:
        job.get()
    q.put('kill')
    pool.close()
    pool.join()
    print('process complete')


if __name__ == '__main__':
    main()

UPDATE::

execute_command executes several processes necessary for search, so I put in code for where q.put() lies.

Alone, the script will take > 72 hrs to finish. Each multiprocess never completes the entire task, rather they work individually and reference a manager.dict() to avoid repeating tasks. These tasks work until every tuple in the manager.dict() has been processed.

def area(self, tup, housing_dict, q):
    state, reg, sub_reg = tup[0], tup[1], tup[2]
    for cat in housing_dict:
        """
        computationally expensive, takes > 72 hours
        for a list of 512 tup(s)
        """
        result = self.search_geotag(
            state, reg, cat, area=sub_reg
            )
    q.put(tup)

The q.put(tup) is ultimately placed in the listener function to add tup to the manager.dict()

irahorecka
  • 1,447
  • 8
  • 25
  • You are having `cpu_count + 2` processes but you are sending only one kill. – Klaus D. Oct 22 '19 at 05:56
  • This is true, however when working with small datasets, the 'kill' does a fine job of killing the multiprocessing task altogether. This is not the case with a large dataset. Could you explain more? – irahorecka Oct 22 '19 at 05:59
  • Supposedly there's [this statement](https://stackoverflow.com/a/26738946/355230) in the documentation: "...you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed", so it sounds like you could periodically pause and wait for the queue to be emptied to avoid the issue. – martineau Oct 22 '19 at 06:00

1 Answers1

1

Since listener and execute_search are sharing the same queue object, there could be race, where execute_search gets 'kill' from queue before listener does, thus listener will stuck in blocking get() forever, since there are no more new items.

For that case you can use Event object to signal all processes to stop:

import multiprocessing as mp
import queue

def listener(q, d, stop_event):
    while not stop_event.is_set():
        try:
           item_to_write = q.get(timeout=0.1)
           foo = d['region']
           foo.add(item_to_write)
           d['region'] = foo
        except queue.Empty:
            pass
    print("Listener process stopped")

def main():
    manager = mp.Manager()
    stop_event = manager.Event()
    q = manager.Queue()
    d = manager.dict()
    d['region'] = set()
    pool = mp.get_context("spawn").Pool(mp.cpu_count() + 2)
    watcher = pool.apply_async(listener, (q, d, stop_event))
    stop_event.set()
    jobs = []
    for i in range(24):
        job = pool.apply_async(execute_search, (q, d))
        jobs.append(job)
    try:
        for job in jobs: 
            job.get(300) #get the result or throws a timeout exception after 300 seconds
    except multiprocessing.TimeoutError:
         pool.terminate()
    stop_event.set() # stop listener process
    print('process complete')


if __name__ == '__main__':
    main()
Samuel
  • 3,631
  • 5
  • 37
  • 71
  • 1
    Wonderful, thank you very much, Samuel. I will try this tonight and check the process in the morning. – irahorecka Oct 22 '19 at 07:01
  • Hello Samuel, unfortunately the subprocesses were blocked and the script deadlocked.. – irahorecka Oct 22 '19 at 17:52
  • 1
    Can you paste here script's backtrace, after it deadlocked and you kill it with CTRL + C ? – Samuel Oct 22 '19 at 19:16
  • Hello Samuel, thank you for your help. I ran the process again this morning, and it failed after 8 hours. Below is the backtrace: – irahorecka Oct 23 '19 at 02:36
  • 1
    @IraH. I've noticed that you have nested `while True:` condition in `listener `, you don't need it anymore. Another question, how does `execute_search` knows when to stop, do you have sources of that method? ` – Samuel Oct 23 '19 at 03:14
  • I placed the code in the post body. Now that I think about it, you have a point. There is no explicit method to stop the program, rather when all tasks run out of tuples to process from ```manager.dict()```, the ```q.put('kill')``` method is called to kill the entire process. – irahorecka Oct 23 '19 at 04:44
  • 1
    Check my edited post. You can use `pool.terminate()` if there's execute_search process didn't completed in specific amount of time ` – Samuel Oct 23 '19 at 04:55
  • 1
    Wonderful! I am slowly learning about multiprocessing and queues and this is helpful. Thank you, @Samuel. – irahorecka Oct 24 '19 at 04:01
  • 1
    Glad to help you @IraH :). If my updated solve your issue, it would be nice if you check my answer as "accepted" one :) – Samuel Oct 24 '19 at 04:11
  • Hello @Samuel, I have another question. In a general case, if an exception occurs in one of the multiprocesses, how could I reintroduce the process into a queue, i.e. another ```execute_search, (q, d)```? – irahorecka Oct 25 '19 at 00:13
  • 1
    @IraH. , AFAIK there's no trivial way to do that. You have to implement kind of monitor (watchdog) process that will monitor all you'r process poll and if some if the crashed will start a new one. More simple solution is to catch all possible exceptions inside the `execute_search` and handle them accordingly – Samuel Oct 25 '19 at 15:36