1

I use a Pool to run several commands simultaneously. I would like to don't print the stack-trace when the user interrupt the script.

Here is my script structure:

def worker(some_element):
    try:
        cmd_res = Popen(SOME_COMMAND, stdout=PIPE, stderr=PIPE).communicate()
    except (KeyboardInterrupt, SystemExit):
        pass
    except Exception, e:
        print str(e)
        return

    #deal with cmd_res...

pool = Pool()
try:
    pool.map(worker, some_list, chunksize = 1)
except KeyboardInterrupt:
    pool.terminate()
    print 'bye!'

By calling pool.terminated() when KeyboardInterrupt raises, I expected to don't print the stack-trace, but it doesn't works, I got sometimes something like:

^CProcess PoolWorker-6:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
    racquire()
KeyboardInterrupt
Process PoolWorker-1:
Process PoolWorker-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
Traceback (most recent call last):

...
bye!

Do you know how I could hide this?

Thanks.

roipoussiere
  • 5,142
  • 3
  • 28
  • 37

4 Answers4

1

When you instantiate Pool, it creates cpu_count() (on my machine, 8) python processes waiting for your worker(). Note that they don't run it yet, they are waiting for the command. When they don't perform your code, they also don't handle KeyboardInterrupt. You can see what they are doing if you specify Pool(processes=2) and send the interruption. You can play with processes number to fix it, but I don't think you can handle it in all the cases.

Personally I don't recommend to use multiprocessing.Pool for the task of launching other processes. It's overkill to launch several python processes for that. Much more efficient way – is using threads (see threading.Thread, Queue.Queue). But in this case you need to implement threading pool youself. Which is not so hard though.

y0prst
  • 611
  • 1
  • 6
  • 13
  • My idea by using Pool was to parallelise processes and save time. If I change the processe number, the script runs slower. On my machine (8 cores): `Pool(2)` -> time=0m17.388s ; `Pool(4)` -> time=0m11.373s ; `Pool()` -> time=0m9.878s – roipoussiere Sep 27 '15 at 22:54
  • Could you give me a small example with using threads pleease ? I tried to use them, but when I interrupted the script the child processes was still alive. – roipoussiere Sep 27 '15 at 23:00
  • 1
    See some thread pool examples here: http://stackoverflow.com/questions/3033952/python-thread-pool-similar-to-the-multiprocessing-pool. And yes, it seems ``KeyboardInterrupt`` doesn't propagate to threads. To deal with it I'd suggest to save Popen() result before ``communicate()`` and ``Popen.kill()`` all launched processes explicitly when KeyboardInterrupt comes to the main thread. – y0prst Sep 28 '15 at 05:11
1

In your case you don't even need pool processes or threads. And then it gets easier to silence KeyboardInterrupts with try-catch.

Pool processes are useful when your Python code does CPU-consuming calculations that can profit from parallelization. Threads are useful when your Python code does complex blocking I/O that can run in parallel. You just want to execute multiple programs in parallel and wait for the results. When you use Pool you create processes that do nothing other than starting other processes and waiting for them to terminate.

The simplest solution is to create all of the processes in parallel and then to call .communicate() on each of them:

try:
    processes = []
    # Start all processes at once
    for element in some_list:
        processes.append(Popen(SOME_COMMAND, stdout=PIPE, stderr=PIPE))
    # Fetch their results sequentially
    for process in processes:
        cmd_res = process.communicate()
        # Process your result here
except KeyboardInterrupt:
    for process in processes:
        try:
            process.terminate()
        except OSError:
            pass

This works when when the output on STDOUT and STDERR isn't too big. Else when another process than the one communicate() is currently running for produces too much output for the PIPE buffer (usually around 1-8 kB) it will be suspended by the OS until communicate() is called on the suspended process. In that case you need a more sophisticated solution:

Asynchronous I/O

Since Python 3.4 you can use the asyncio module for single-thread pseudo-multithreading:

import asyncio
from asyncio.subprocess import PIPE

loop = asyncio.get_event_loop()

@asyncio.coroutine
def worker(some_element):
    process = yield from asyncio.create_subprocess_exec(*SOME_COMMAND, stdout=PIPE)
    try:
        cmd_res = yield from process.communicate()
    except KeyboardInterrupt:
        process.terminate()
        return
    try:
        pass # Process your result here
    except KeyboardInterrupt:
        return

# Start all workers
workers = []
for element in some_list:
    w = worker(element)
    workers.append(w)
    asyncio.async(w)

# Run until everything complete
loop.run_until_complete(asyncio.wait(workers))

You should be able to limit the number of concurrent processes using e.g. asyncio.Semaphore if you need to.

cg909
  • 2,247
  • 19
  • 23
  • On python 3.4 I can't execute your code, python says async.io is not a module and fails on many lines like `from asyncio.subprocess import PIPE`, `loop = asyncio.get_event_loop()`, `@asyncio.coroutine`. – roipoussiere Sep 29 '15 at 23:17
  • "async.io is not a module"? Did you write `async.io` instead of `asyncio`? Since 3.4 the asyncio module is in the standard library, so it should be there by default. – cg909 Sep 29 '15 at 23:34
  • Sorry, I wanted to say: python says than **asyncio** is not a **package**. – roipoussiere Sep 30 '15 at 00:13
  • That's weird. Can you post the whole error message (the whole last line)? Maybe the asyncio package is missing from your installation for some reason. What's the exact version number Python reports and what OS do you use? – cg909 Sep 30 '15 at 00:20
  • 1
    You named your script asyncio.py, so `import asyncio` (re)imports your own script instead of the asyncio package. Try renaming it to something else. – cg909 Sep 30 '15 at 00:50
0

Your child process will receive both the KeyboardInterrupt exception and the exception from the terminate().

Because the child process receives the KeyboardInterrupt, a simple join() in the parent -- rather than the terminate() -- should suffice.

Patrick Maupin
  • 8,024
  • 2
  • 23
  • 42
0

As suggested y0prst I used threading.Thread instead of Pool.

Here is a working example, which rasterize a set of vectors with ImageMagick (I know I can use mogrify for this, it's just an example).

#!/usr/bin/python

from os.path import abspath
from os import listdir
from threading import Thread
from subprocess import Popen, PIPE

RASTERISE_CALL = "magick %s %s"
INPUT_DIR = './tests_in/'

def get_vectors(dir):
    '''Return a list of svg files inside the `dir` directory'''
    return [abspath(dir+f).replace(' ', '\\ ') for f in listdir(dir) if f.endswith('.svg')]

class ImageMagickError(Exception):
    '''Custom error for ImageMagick fails calls'''
    def __init__(self, value): self.value = value
    def __str__(self): return repr(self.value)

class Rasterise(Thread):
    '''Rasterizes a given vector.'''
    def __init__(self, svg):
        self.stdout = None
        self.stderr = None
        Thread.__init__(self)
        self.svg = svg

    def run(self):
        p = Popen((RASTERISE_CALL % (self.svg, self.svg + '.png')).split(), shell=False, stdout=PIPE, stderr=PIPE)
        self.stdout, self.stderr = p.communicate()
        if self.stderr is not '':
            raise ImageMagickError, 'can not rasterize ' + self.svg + ': ' + self.stderr

threads = []

def join_threads():
    '''Joins all the threads.'''
    for t in threads:
        try:
            t.join()
        except(KeyboardInterrupt, SystemExit):
            pass

#Rasterizes all the vectors in INPUT_DIR.
for f in get_vectors(INPUT_DIR):
    t = Rasterise(f)

    try:
        print 'rasterize ' + f
        t.start()
    except (KeyboardInterrupt, SystemExit):
        join_threads()
    except ImageMagickError:
        print 'Opps, IM can not rasterize ' + f + '.'
        continue

    threads.append(t)

# wait for all threads to end
join_threads()

print ('Finished!')

Please, tell me if you think there are a more pythonic way to do that, or if it can be optimised, I will edit my answer.

roipoussiere
  • 5,142
  • 3
  • 28
  • 37