2

I want to iterate over some asynchronous results from an ipython parallel map as they arrive. The only way I can find to do this is to iterate over the results object. However if one of the tasks raises an exception the iteration terminates. Is there any way to do this? See code below, the iteration terminates when the second job raises an exception.

from IPython import parallel

def throw_even(i):
    if i % 2 == 0:
        raise RuntimeError('ERROR: %d' % i)
    return i

rc = parallel.Client()
lview = rc.load_balanced_view() # default load-balanced view

# map onto the engines.
args = range(1, 5)
print args
async_results = lview.map_async(throw_even, range(1, 5), ordered=True)

# get results
args_iter = iter(args)
results_iter = iter(async_results)
while True:
    try:
        arg = args_iter.next()
        result = results_iter.next()
        print 'Job %s completed: %d' % (arg, result)            
    except StopIteration:
        print 'Finished iteration'
        break
    except Exception as e:
        print '%s: Job %d: %s' % (type(e), arg, e)

gives the following output that stops before jobs 3 and 4 are reported

[1, 2, 3, 4]
Job 1 completed: 1
<class 'IPython.parallel.error.RemoteError'>: Job 2: RuntimeError(ERROR: 2)
Finished iteration

Is there some way to do this?

Epimetheus
  • 1,119
  • 1
  • 10
  • 19
  • I've realised that the map idiom is not a suitable way to do this. I'm better off just using lview.apply and handling each result individually. – Epimetheus Apr 24 '13 at 09:13

2 Answers2

0

This question might be relevant. I dont't really see why you would want to throw an exception from a remote engine, though. Although, if you do want to do it, I think you can do it in the same way I answered the question mentioned. Which I now see you already realized in your comments, but this should do it anyway.

def throw_even(i):
    if i%2:
       return i
    raise(RuntimeError('Error %d'%i)

params = range(1,5)

n_cores = len(c.ids)
for n,p in enumerate( params ):
    core = c.ids[n%n_cores]
    calls.append( c[core].apply_async( throw_even, p ) )

#then you get the results

while calls != []:
    for c in calls:
        try:
             result = c.get(1e-3)
             print(result[0])
             calls.remove( c )
             #in the case your call failed, you can apply_async again.
             # and append the call to calls.
        except parallel.TimeoutError:
             pass
        except Exception as e:
             knock_yourself_out(e)
Community
  • 1
  • 1
Alex S
  • 1,027
  • 1
  • 10
  • 17
  • 1
    It is not always that you _want_ to raise exceptions on a remote engine, it's that your code/data finds new and interesting ways to break on the remote engines ;) and it is very annoying when you can't get back 500+ results because 7 of them had screwy data. – tacaswell Nov 15 '13 at 04:21
  • Sure, so creating a different view for every parameter should keep the exceptions encapsulated. – Alex S Nov 18 '13 at 10:06
0

A sneaky way around this is to reach into the internal of the AsyncMapResult and grab _result which is a list of the results. This does not help you directly, but only after the fact:

tt = async_results._results
fail_indx = [j for j, r in enumerate(tt) if isinstance(r, IPython.parallel.error.RemoteError)]
good_indx = [j for j, r in enumerate(tt) if not isinstance(r, IPython.parallel.error.RemoteError)]

just_the_results =  [r for  r in tt if not isinstance(r, IPython.parallel.error.RemoteError)]
tacaswell
  • 84,579
  • 22
  • 210
  • 199