I am trying to process data in parallel using ipython's parallel processing. I am following the instructions by @minrk in answer to the question on how to get intermidiate results in ipython parallel processing?. Since the data is heterogeneous some of the processing tasks are finished sooner than others and I would like to save them as soon as they become available. I do this in the following fashion:
from IPython.parallel import Client
def specialfunc(param):
import time
if param > 8:
raise IOError
else:
time.sleep( param)
return param
client = Client()
balanced = client.load_balanced_view()
balanced.block = False
param_list = range(10) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)
I can then loop over asyncmap and results become available when they are ready:
for i in asyncmap:
print i
The trouble is that my code sometimes throws exceptions (the example above forces an IOError when the calling parameter exceeds 8) which I would like to deal with. However, as soon as one of the engines throws a wobbly, the whole asyncmap 'appears' to be finished.
I actually noticed that when I interrogate asyncmap.metadata can very well figure out which message gave an error (asyncmap.metadata[i]['pyerr']) but then I don't know how to wait for the results to come in as they do.
So my question is how should I process my results arriving asynchronously from my engines even if they do sometimes throw exceptions. How do I catch the exceptions in the engines without upsetting the waiting for results in the controller?