4

I am using multiprocessing in this manner:

import multiprocessing as mp

def worker(thread_id, tasks, results):
    tmp_dir = 'temp_for_{}'.format(thread_id)
    os.makedirs(tmp_dir)
    try:
        while not tasks.empty():
            data = tasks.get()
            response = process_pdf(data, tmp_dir)
            results.put(response)
    except (KeyboardInterrupt, SystemExit):
        log.info('Interrupt signal received in thread %s.', thread_id)
    except Queue.Empty:
        pass
    except Exception:
        log.error("Unexpected error in %s", thread_id, exc_info=True)
    finally:
        shutil.rmtree(tmp_dir)
        log.info("Thread %s exit", thread_id)

if __name__ == "__main__":
    tasks, results = mp.Queue(), mp.Queue()
    for record in cursor.select(query):
        tasks.put(record)
    manager = mp.Manager()
    workers = [mp.Process(target=worker, args=(i, tasks, results)) for i in xrange(8)]
    for worker in workers:
        worker.start()
    try:
        for worker in workers:
            worker.join()
    except (KeyboardInterrupt, SystemExit):
        log.info('Interrupt signal received in main. Cleaning up main')
    finally:
        log.info('Got %s results. Saving', results.qsize())
        while not results.empty():
            cursor.update_one('documents', 'id', results.get())
        cursor.close()

Here's the output when I run this code:

14:34:04 15/10 INFO: Thread 6 exit
14:34:04 15/10 INFO: Thread 7 exit
14:34:21 15/10 INFO: Thread 3 exit
14:34:24 15/10 INFO: Thread 2 exit
14:34:24 15/10 INFO: Thread 1 exit
14:34:29 15/10 INFO: Thread 5 exit
14:34:36 15/10 INFO: Thread 0 exit
14:35:37 15/10 INFO: Thread 4 exit

Then I enter ^C after waiting for a while with no progress, and get this output:

^C14:37:16 15/10 INFO: Interrupt signal received in main. Cleaning up main
14:37:16 15/10 INFO: Got 16 results. Saving

And I get this traceback for all threads:

Process Process-9:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 261, in _bootstrap
    util._exit_function()
  File "/usr/lib64/python2.7/multiprocessing/util.py", line 328, in _exit_function
    util._exit_function()
  File "/usr/lib64/python2.7/multiprocessing/util.py", line 274, in _run_finalizers
    finalizer()
  File "/usr/lib64/python2.7/multiprocessing/util.py", line 207, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib64/python2.7/multiprocessing/queues.py", line 218, in _finalize_join
    thread.join()
  File "/usr/lib64/python2.7/threading.py", line 952, in join
    thread.join()
  File "/usr/lib64/python2.7/threading.py", line 340, in wait
    waiter.acquire()
KeyboardInterrupt

Why is this hanging? If it's important, I can add that process_pdf() runs a few subprocesses with subprocess.Popen().

dano
  • 91,354
  • 19
  • 222
  • 219
El Ruso
  • 14,745
  • 4
  • 31
  • 54
  • Does the problem go away if you `get` everything from the `results` queue prior to calling `worker.join()` on all the processes? – dano Oct 15 '15 at 15:12
  • 1
    Off-topic: You're hand-managing a worker pool and queues here to do what `multiprocessing.Pool` would do for you with `imap_unordered`. Redesigning to use it would dramatically simplify your code. – ShadowRanger Oct 15 '15 at 15:18
  • @dano How it may cause to this issue? It's not JoinableQueue – El Ruso Oct 15 '15 at 15:19
  • @ShadowRanger I need to handle KeyboardInterrupt in processes. See http://bryceboe.com/2012/02/14/python-multiprocessing-pool-and-keyboardinterrupt-revisited/ this – El Ruso Oct 15 '15 at 15:22
  • 1
    @ElRuso See [this answer](http://stackoverflow.com/questions/26738648/script-using-multiprocessing-module-does-not-terminate/26738946#26738946). Your code isn't quite a perfect match, since it doesn't look like you're putting a ton of data into the queue, but it's worth ruling out. – dano Oct 15 '15 at 15:24
  • 2
    @ElRuso: Yeah, this is one reason (of many) to move to Python 3; it fixes several issues related to this ([3.2 fixes lock acquisition not being interruptable](https://bugs.python.org/issue8844), and [2.7.10/3.5.0 fix handling of exceptions during `imap`/`imap_unordered` task dispatch handling](https://bugs.python.org/issue23051)). The effects of a `Ctrl-C` are still ugly even in 3.5 (worker `KeyboardInterrupt` tracebacks spew to the terminal, and you sometimes need two `Ctrl-C`s to _really_ end), but 1-2 `Ctrl-C`s _always_ ends the program (and `finally` handlers can output partial results). – ShadowRanger Oct 15 '15 at 16:32

1 Answers1

7

Big thanks to dano for his hint. Fix for this issue is create queue using Manager():

manager = mp.Manager()
tasks, results = manager.Queue(), manager.Queue()

Edit
Tnx to ShadowRanger. Looks like exceptions in dispatch fixed for 2.7.10 and now we can use multiprocessing.Pool with imap_unorderedand don't need write wall of code for simple job :) But I didn't try it yet

El Ruso
  • 14,745
  • 4
  • 31
  • 54
  • 1
    While exceptions in dispatch are fixed in 2.7.10, lock acquisition remains uninterruptable in all 2.x releases, and that's a large part of the problem. I can definitely lock up even on 2.7.10 under certain conditions because specific threads aren't interrupted by the `SIGINT`, and others `join` on them. – ShadowRanger Oct 15 '15 at 17:37
  • 1
    @ShadowRanger This conversation, UnicodDedecodeError in 2.x :) and new `yield` syntax in 3.x definitely caused me to consider again question about to move to Python 3 – El Ruso Oct 15 '15 at 18:11