You can do it more simply than that. Here's a coroutine that submits four slow function calls to subprocesses and awaits them:
from concurrent.futures import ProcessPoolExecutor
from time import sleep
from tornado import gen, ioloop
pool = ProcessPoolExecutor()
def calculate_slowly(x):
sleep(x)
return x
async def parallel_tasks():
# Create futures in a randomized order.
futures = [gen.convert_yielded(pool.submit(calculate_slowly, i))
for i in [1, 3, 2, 4]]
wait_iterator = gen.WaitIterator(*futures)
while not wait_iterator.done():
try:
result = await wait_iterator.next()
except Exception as e:
print("Error {} from {}".format(e, wait_iterator.current_future))
else:
print("Result {} received from future number {}".format(
result, wait_iterator.current_index))
ioloop.IOLoop.current().run_sync(parallel_tasks)
It outputs:
Result 1 received from future number 0
Result 2 received from future number 2
Result 3 received from future number 1
Result 4 received from future number 3
You can see that the coroutine receives results in the order they complete, not the order they were submitted: future number 1 resolves after future number 2, because future number 1 slept longer. convert_yielded transforms the Futures returned by ProcessPoolExecutor into Tornado-compatible Futures that can be awaited in a coroutine.
Each future resolves to the value returned by calculate_slowly: in this case it's the same number that was passed into calculate_slowly, and the same number of seconds as calculate_slowly sleeps.
To include this in a RequestHandler, try something like this:
class MainHandler(web.RequestHandler):
async def get(self):
self.write("Starting....\n")
self.flush()
futures = [gen.convert_yielded(pool.submit(calculate_slowly, i))
for i in [1, 3, 2, 4]]
wait_iterator = gen.WaitIterator(*futures)
while not wait_iterator.done():
result = await wait_iterator.next()
self.write("Result {} received from future number {}\n".format(
result, wait_iterator.current_index))
self.flush()
if __name__ == "__main__":
application = web.Application([
(r"/", MainHandler),
])
application.listen(8888)
ioloop.IOLoop.instance().start()
You can observe if you curl localhost:8888
that the server responds incrementally to the client request.