1

I currently have code in the form of a generator which calls an IO-bound task. The generator actually calls sub-generators as well, so a more general solution would be appreciated.

Something like the following:

def processed_values(list_of_io_tasks):
    for task in list_of_io_tasks:
        value = slow_io_call(task)
        yield postprocess(value) # in real version, would iterate over 
                                 # processed_values2(value) here

I have complete control over slow_io_call, and I don't care in which order I get the items from processed_values. Is there something like coroutines I can use to get the yielded results in the fastest order by turning slow_io_call into an asynchronous function and using whichever call returns fastest? I expect list_of_io_tasks to be at least thousands of entries long. I've never done any parallel work other than with explicit threading, and in particular I've never used the various forms of lightweight threading which are available.

I need to use the standard CPython implementation, and I'm running on Linux.

Ethan Furman
  • 63,992
  • 20
  • 159
  • 237
Zachary Vance
  • 752
  • 4
  • 18

1 Answers1

2

Sounds like you are in search of multiprocessing.Pool(), specifically the Pool.imap_unordered() method.

Here is a port of your function to use imap_unordered() to parallelize calls to slow_io_call().

 def processed_values(list_of_io_tasks):
     pool = multiprocessing.Pool(4) # num workers
     results = pool.imap_unordered(slow_io_call, list_of_io_tasks)
     while True:
         yield results.next(9999999) # large time-out

Note that you could also iterate over results directly (i.e. for item in results: yield item) without a while True loop, however calling results.next() with a time-out value works around this multiprocessing keyboard interrupt bug and allows you to kill the main process and all subprocesses with Ctrl-C. Also note that the StopIteration exceptions are not caught in this function but one will be raised when results.next() has no more items return. This is legal from generator functions, such as this one, which are expected to either raise StopIteration errors when there are no more values to yield or just stop yielding and a StopIteration exception will be raised on it's behalf.

To use threads in place of processes, replace
import multiprocessing
with
import multiprocessing.dummy as multiprocessing

Community
  • 1
  • 1
Garrett
  • 47,045
  • 6
  • 61
  • 50
  • Ah, the problem is just that slow_io_call is sourced to outside python and might take a while to return -- adding as many instances as possible which have entered the slow_io_call would be preferable since I can issue as many simultaneous requests as I'd like (ex: querying a distributed client for information, combining hard disk writes, where the number of requests doesn't significantly affect response time of any request). While it's not documented, I assume a Pool of processes enters the iterator at most 4 times and then races, rather than suspending and going to the next iteration step? – Zachary Vance Apr 19 '11 at 00:43
  • Try increasing the pool size and compare process versus thread pools to find the optimal solution for your application, which is always a balance between relative speedup and increased overhead. – Garrett Apr 19 '11 at 03:01