0

I have a tornado application which needs to run a blocking function on ProcessPoolExecutor. This blocking function employs a library which emits incremental results via blinker events. I'd like to collect these events and send them back to my tornado app as they occur.

At first, tornado seemed ideal for this use case because its asynchronous. I thought I could simply pass a tornado.queues.Queue object to the function to be run on the pool and then put() events onto this queue as part of my blinker event callback.

However, reading the docs of tornado.queues.Queue, I learned they are not managed across processes like multiprocessing.Queue and are not thread safe.

Is there a way to retrieve these events from the pool as they occur? Should I wrap multiprocessing.Queue so it produces Futures? That seems unlikely to work as I doubt the internals of multiprocessing are compatible with tornado.

[EDIT] There are some good clues here: https://gist.github.com/hoffrocket/8050711

matthewatabet
  • 1,463
  • 11
  • 26

2 Answers2

1

To collect anything but the return value of a task passed to a ProcessPoolExecutor, you must use a multiprocessing.Queue (or other object from the multiprocessing library). Then, since multiprocessing.Queue only exposes a synchronous interface, you must use another thread in the parent process to read from the queue (without reaching into implementation details. There's a file descriptor that could be used here, but we'll ignore that for now since it's undocumented and subject to change).

Here's a quick untested example:

queue = multiprocessing.Queue()
proc_pool = concurrent.futures.ProcessPoolExecutor()
thread_pool = concurrent.futures.ThreadPoolExecutor()

async def read_events():
    while True:
        event = await thread_pool.submit(queue.get)
        print(event)

async def foo():
    IOLoop.current.spawn_callback(read_events)
    await proc_pool.submit(do_something_and_write_to_queue)
Ben Darnell
  • 21,844
  • 3
  • 29
  • 50
  • Yeah, this is more or less what I ended up doing, but used the `aioprocessing` module which supports a non-blocking `AioQueue.coro_get`. – matthewatabet Feb 23 '17 at 19:37
0

You can do it more simply than that. Here's a coroutine that submits four slow function calls to subprocesses and awaits them:

from concurrent.futures import ProcessPoolExecutor
from time import sleep

from tornado import gen, ioloop

pool = ProcessPoolExecutor()


def calculate_slowly(x):
    sleep(x)
    return x


async def parallel_tasks():
    # Create futures in a randomized order.
    futures = [gen.convert_yielded(pool.submit(calculate_slowly, i))
               for i in [1, 3, 2, 4]]

    wait_iterator = gen.WaitIterator(*futures)
    while not wait_iterator.done():
        try:
            result = await wait_iterator.next()
        except Exception as e:
            print("Error {} from {}".format(e, wait_iterator.current_future))
        else:
            print("Result {} received from future number {}".format(
                result, wait_iterator.current_index))


ioloop.IOLoop.current().run_sync(parallel_tasks)

It outputs:

Result 1 received from future number 0
Result 2 received from future number 2
Result 3 received from future number 1
Result 4 received from future number 3

You can see that the coroutine receives results in the order they complete, not the order they were submitted: future number 1 resolves after future number 2, because future number 1 slept longer. convert_yielded transforms the Futures returned by ProcessPoolExecutor into Tornado-compatible Futures that can be awaited in a coroutine.

Each future resolves to the value returned by calculate_slowly: in this case it's the same number that was passed into calculate_slowly, and the same number of seconds as calculate_slowly sleeps.

To include this in a RequestHandler, try something like this:

class MainHandler(web.RequestHandler):
    async def get(self):
        self.write("Starting....\n")
        self.flush()

        futures = [gen.convert_yielded(pool.submit(calculate_slowly, i))
                   for i in [1, 3, 2, 4]]

        wait_iterator = gen.WaitIterator(*futures)
        while not wait_iterator.done():
            result = await wait_iterator.next()
            self.write("Result {} received from future number {}\n".format(
                result, wait_iterator.current_index))

            self.flush()


if __name__ == "__main__":
    application = web.Application([
        (r"/", MainHandler),
    ])
    application.listen(8888)
    ioloop.IOLoop.instance().start()

You can observe if you curl localhost:8888 that the server responds incrementally to the client request.

A. Jesse Jiryu Davis
  • 23,641
  • 4
  • 57
  • 70
  • That's not quite what I mean. To clarify, I have a single slow function call. And, that function call emits a number of events via `blinker`. I'd like to collect those events and send them back to the main tornado thread. I believe the code you have here runs four slow functions in parallel, rather than one slow function. – matthewatabet Feb 21 '17 at 19:16