16

I want to apply a function in parallel using multiprocessing.Pool. The problem is that if one function call triggers a segmentation fault the Pool hangs forever. Has anybody an idea how I can make a Pool that detects when something like this happens and raises an error?

The following example shows how to reproduce it (requires scikit-learn > 0.14)

import numpy as np
from sklearn.ensemble import gradient_boosting
import time

from multiprocessing import Pool

class Bad(object):
    tree_ = None


def fit_one(i):
    if i == 3:
        # this will segfault                                                    
        bad = np.array([[Bad()] * 2], dtype=np.object)
        gradient_boosting.predict_stages(bad,
                                         np.random.rand(20, 2).astype(np.float32),
                                         1.0, np.random.rand(20, 2))
    else:
        time.sleep(1)
    return i


pool = Pool(2)
out = pool.imap_unordered(fit_one, range(10))
# we will never see 3
for o in out:
    print o
Peter Prettenhofer
  • 1,951
  • 18
  • 23
  • 2
    Fix the segmentation fault? Usually segfaults are caused by invalid memory access, which is _undefined_ behavior and not guarenteed to cause a segfault at all. – Colonel Thirty Two Jun 23 '14 at 16:36
  • No answers, but I can say that joblib.Parallel seems to hang forever. From what I can tell, there is no way to return the segfault or add a "watchdog" timeout in multiprocessing. – Kyle Kastner Jun 27 '14 at 11:38
  • 1
    Actually, maybe you can add a timeout decorator? Such as shown here: http://code.activestate.com/recipes/577028/ – Kyle Kastner Jun 27 '14 at 11:41
  • 1
    See this answer: http://stackoverflow.com/a/24396655/2073595. It's a bit messy, but you can monitor the individual processes in your pool to see if one has restarted unexpectedly. – dano Jul 16 '14 at 15:05
  • 3
    Also of note here: [`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) does detect when a process has been killed unexpectedly, and will raise a `BrokenProcessPool` exception on any outstanding tasks when it happens. There is also [a bug](http://bugs.python.org/issue22393) filed against `multiprocessing`, which has a working patch, to add that same behavior to `multiprocessing.Pool`. – dano Mar 27 '15 at 16:13
  • Go back to processing sequentially, fix/handle problem, then try multiprocessing again. – Ninga Apr 14 '15 at 21:26

4 Answers4

3

As described in the comments, this just works in Python 3 if you use concurrent.Futures.ProcessPoolExecutor instead of multiprocessing.Pool.

If you're stuck on Python 2, the best option I've found is to use the timeout argument on the result objects returned by Pool.apply_async and Pool.map_async. For example:

pool = Pool(2)
out = pool.map_async(fit_one, range(10))
for o in out:
    print o.get(timeout=1000)  # allow 1000 seconds max

This works as long as you have an upper bound for how long a child process should take to complete a task.

shoyer
  • 9,165
  • 1
  • 37
  • 55
3

This is a known bug, issue #22393, in Python. There is no meaningful workaround as long as you're using multiprocessing.pool until it's fixed. A patch is available at that link, but it has not been integrated into the main release as yet, so no stable release of Python fixes the problem.

ShadowRanger
  • 143,180
  • 12
  • 188
  • 271
1

Instead of using Pool().imap() maybe you would rather manually create child processes yourself with Process(). I bet the object returned would allow you to get liveness status of any child. You will know if they hang up.

ArekBulski
  • 4,520
  • 4
  • 39
  • 61
0

I haven't run your example to see if it can handle the error, but try concurrent futures. Simply replace my_function(i) with your fit_one(i). Keep the __name__=='__main__': structure. concurrent futures seems to need this. The code below is tested on my machine so will hopefully work straight up on yours.

import concurrent.futures

def my_function(i):
    print('function running')
    return i

def run():
    number_processes=4
    executor = concurrent.futures.ProcessPoolExecutor(number_processes)
    futures = [executor.submit(my_function,i) for i in range(10)]
    concurrent.futures.wait(futures)

    for f in futures:
        print(f.result())

if __name__ == '__main__':
    run()
Ninga
  • 689
  • 7
  • 14
  • I'm just thinking it might work because you can call all sorts of methods on the itterable 'futures' which is returned after completion of all the processes. So it may be able to take the error in its stride. – Ninga Mar 29 '15 at 00:50