13

Is there a way to re-send a piece of data for processing, if the original computation failed, using a simple pool?

import random
from multiprocessing import Pool

def f(x):
   if random.getrandbits(1):
       raise ValueError("Retry this computation")
   return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
atp
  • 30,132
  • 47
  • 125
  • 187
  • 1
    Perhaps you want to `return f(x)` instead of raising a `ValueError`? Just guessing... – Paulo Freitas Jul 24 '12 at 02:03
  • How high is the chance of failure in your actual application? That is, how important is it that the process retry immediately as opposed to waiting for the other processes to finish first? – Isaac Jul 24 '12 at 02:05
  • It's a moderate chance of failure, and it doesn't need to be immediately retried (but should be retried in parallel, eventually). – atp Jul 24 '12 at 05:29

2 Answers2

22

If you can (or don't mind) retrying immediately, use a decorator wrapping the function:

import random
from multiprocessing import Pool
from functools import wraps

def retry(f):
    @wraps(f)
    def wrapped(*args, **kwargs):
        while True:
            try:
                return f(*args, **kwargs)
            except ValueError:
                pass
    return wrapped

@retry
def f(x):
    if random.getrandbits(1):
        raise ValueError("Retry this computation")
    return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
Andrew Alcock
  • 19,401
  • 4
  • 42
  • 60
10

You can use a Queue to feed back failures into the Pool through a loop in the initiating Process:

import multiprocessing as mp
import random

def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    total_items = len(pending)
    successful = []
    failure_tracker = []

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, pending)
    retry_results = []
    while len(successful) < total_items:
        successful.extend([r for r in results if not r is None])
        successful.extend([r for r in retry_results if not r is None])
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        if failed_items:
            failure_tracker.append(failed_items)
            retry_results = p.imap(f, failed_items);
    p.close()
    p.join()

    print "Results: %s" % successful
    print "Failures: %s" % failure_tracker

if __name__ == '__main__':
    main(range(1, 10))

The output is like this:

Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]

A Pool cant be shared between multiple processes. Hence this Queue based approach. If you try to pass a pool as a parameter to the pools processes, you will get this error:

NotImplementedError: pool objects cannot be passed between processes or pickled

You could alternatively try a few immediate retries within your function f, to avoid synchronisation overhead. It really is a matter of how soon your function should wait to retry, and on how likely a success is if retried immediately.


Old Answer: For the sake of completeness, here is my old answer, which isn't as optimal as resubmitting directly into the pool, but might still be relevant depending on the use case, because it provides a natural way to deal with/limit n-level retries:

You can use a Queue to aggregate failures and resubmit at the end of each run, over multiple runs:

import multiprocessing as mp
import random


def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    run_number = 1
    while pending:
        jobs = pending
        pending = []

        q = mp.Queue()
        p = mp.Pool(None, f_init, [q])
        results = p.imap(f, jobs)
        p.close()

        p.join()
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        successful = [r for r in results if not r is None]
        print "(%d) Succeeded: %s" % (run_number, successful)
        print "(%d) Failed:    %s" % (run_number, failed_items)
        print
        pending = failed_items
        run_number += 1

if __name__ == '__main__':
    main(range(1, 10))

with output like this:

(1) Succeeded: [9, 16, 36, 81]
(1) Failed:    [2, 1, 5, 7, 8]

(2) Succeeded: [64]
(2) Failed:    [2, 1, 5, 7]

(3) Succeeded: [1, 25]
(3) Failed:    [2, 7]

(4) Succeeded: [49]
(4) Failed:    [2]

(5) Succeeded: [4]
(5) Failed:    []
Preet Kukreti
  • 8,417
  • 28
  • 36
  • Updated my answer to one that doesnt require multiple runs, and now works on the same original pool. – Preet Kukreti Jul 24 '12 at 04:44
  • Thanks for the detailed response. I like the idea of putting failed computations in a queue to be retried. I must award Andrew with the bounty because his solution does a simple retry. – atp Jul 24 '12 at 20:10
  • @ash I did mention immediate retries in my response, thinking that it would be a trivial/simple addition and not what you were looking for. Note also that it (immediate retries) is not optimal for all cases, especially those where an immediate retry has low chance of succeeding (in which case it is heavily suboptimal as it causes resource starvation for jobs that potentially could succeed.) Congrats to Andrew anyway. – Preet Kukreti Jul 26 '12 at 04:26