36

I'm trying to figure out how to port a threaded program to use asyncio. I have a lot of code which synchronizes around a few standard library Queues, basically like this:

import queue, random, threading, time

q = queue.Queue()

def produce():
    while True:
        time.sleep(0.5 + random.random())  # sleep for .5 - 1.5 seconds
        q.put(random.random())

def consume():
    while True: 
        value = q.get(block=True)
        print("Consumed", value)

threading.Thread(target=produce).start()
threading.Thread(target=consume).start()

One thread creates values (possibly user input), and another thread does something with them. The point is that these threads are idle until there's new data, at which point they wake up and do something with it.

I'm trying to implement this pattern using asyncio, but I can't seem to figure out how to make it "go".

My attempts look more or less like this (and don't do anything at all).

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True: 
        q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)

# do something here to start the coroutines. asyncio.Task()? 

loop = asyncio.get_event_loop()
loop.run_forever()

I've tried variations on using coroutines, not using them, wrapping stuff in Tasks, trying to make them create or return futures, etc.

I'm starting to think that I have the wrong idea about how I should be using asyncio (maybe this pattern should be implemented in a different way that I'm not aware of). Any pointers would be appreciated.

Andrew Svetlov
  • 16,730
  • 8
  • 66
  • 69
Seth
  • 45,033
  • 10
  • 85
  • 120
  • Why are you leaving threading in favor of asyncio? – dmmd Feb 06 '18 at 20:11
  • @dmmd - Not leaving, I use threads all the time. Asyncio is, however, convenient for some kinds of problems, particularly when they involve a lot of blocking I/O. And on systems without unlimited resources (raspberry pi's, "cloud" machines), sometimes asyncio can accomplish the same thing with less effort. – Seth Feb 07 '18 at 20:49
  • Good to know, thanks. – dmmd Feb 09 '18 at 18:52

3 Answers3

39

Yes, exactly. Tasks are your friends:

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True:
        yield from q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)


loop = asyncio.get_event_loop()
loop.create_task(produce())
loop.create_task(consume())
loop.run_forever()

asyncio.ensure_future can be used for task creation also.

And please keep in mind: q.put() is a coroutine, so you should to use yield from q.put(value).

UPD

Switched from asyncio.Task()/asyncio.async() to new brand API loop.create_task() and asyncio.ensure_future() in example.

Andrew Svetlov
  • 16,730
  • 8
  • 66
  • 69
  • I'm curious as to whether or not a 'while True' loop is good to use for production code? Also is there a recommended way to test this? – user772401 Aug 30 '15 at 20:23
  • @user772401 usually it depends on producer. For generating random sequence `while True` is good example, for reading from socket -- maybe, your code should wait a portion of new data from socket anyway. For testing *my* example I suggest pinning random seed to constant value and analyzing consumed result. More complex scenarios require another technique. – Andrew Svetlov Aug 31 '15 at 13:39
  • ok thanks. What I was wondering about in regards to the while loop, is there some mechanism that says 'run this code forever'. Because I feel like if an exception occurs, than the while loop will break and you'll have to restart your program. Otherwise you'll have to wrap the code inside the 'while True' loop in a try/except so that if something exceptional happens it doesn't ruin the entire program but then that looks ugly... – user772401 Sep 01 '15 at 14:52
  • An exception in a task closes only the task, not the whole event loop – Andrew Svetlov Sep 01 '15 at 15:12
  • 1
    But in the case of producer/consumer pattern, a Task running on a 'while True' loop is potentially 50% of my program (aka event loop). Thus (in a current project i'm playing with) it makes the event loop irrelevant to run if either my producer or consumer task goes down. I guess what I want is something like a supervisord type of magic that says 'if this Task errors out, reschedule it'... I feel the only way to ensure a consumer task never breaks on bad input (input you don't control) is to wrap its critical code in a try/except – user772401 Sep 01 '15 at 15:42
  • What you really need is `try/except` block in your task -- as you need it for synchronous code. Regular sync code has no supervisors, only proper exception handling, isn't it? – Andrew Svetlov Sep 01 '15 at 21:32
  • 1
    Nice answer. I just have one small quirk. Quoting the documentation: ["Don’t directly create `Task` instances: use the `ensure_future()` function or the `BaseEventLoop.create_task()` method."](https://docs.python.org/3/library/asyncio-task.html#task) – Frederik Aalund Dec 02 '15 at 11:41
  • @FrederikAalund an year and half ago `ensure_future()` was not existing as well as `loop.create_task()` :) – Andrew Svetlov Dec 02 '15 at 11:57
  • 2
    @FrederikAalund Updated answer anyway – Andrew Svetlov Dec 02 '15 at 12:00
  • @AndrewSvetlov Ah, that figures. In any case, I think it is good (albeit laborious) practice to keep answers updated with these kind of API changes to showcase the best practices. Thanks for updating your answer. – Frederik Aalund Dec 02 '15 at 12:17
  • Since asyncio is a provisional module, it's probably a good idea to keep answers about it current :) – Seth Jan 17 '16 at 20:35
  • Is there anyway around the whole `while True:` loop for each of the tasks? – dalanmiller Feb 01 '16 at 23:42
  • @dalanmiller would you elaborate the question? – Andrew Svetlov Feb 02 '16 at 00:02
  • In an effort to reduce CPU stalling and maximize the use of coroutines here, you have the two `while True:` loops here in producer and consumer. Is there a way I wonder to do the same thing more elegantly without `while True:`? – dalanmiller Feb 02 '16 at 00:31
  • In the example probably no, at least I don't see simpler solution. But real application code usually looks more complex and not reduced to trivial loops. – Andrew Svetlov Feb 02 '16 at 10:57
5

Here's what I use in production, moved to gist: https://gist.github.com/thehesiod/7081ab165b9a0d4de2e07d321cc2391d

amohr
  • 455
  • 3
  • 10
  • Thanks, you helped me a lot! I needed a queue with options to add and process items. However, in my case i added following: `async def finish(self): await self._queue.join() await self.join()` And then using AsyncWorkerPool instance like this: `loop.run_until_complete(pool.finish())` to finish when there is nothing to process. – Rast Jul 10 '16 at 02:16
2

A bit later and maybe OT, have in mind that you can consume from the Queue from multiple tasks as they were independent consumers.

The following snippet shows as an example how you can achieve the same thread pool pattern with asyncio tasks.

q = asyncio.Queue()

async def sum(x):
    await asyncio.sleep(0.1)  # simulates asynchronously
    return x

async def consumer(i):
    print("Consumer {} started".format(i))
    while True:
        f, x = await q.get()
        print("Consumer {} procesing {}".format(i, x))
        r = await sum(x)
        f.set_result(r)

async def producer():
    consumers = [asyncio.ensure_future(consumer(i)) for i in range(5)]
    loop = asyncio.get_event_loop()
    tasks = [(asyncio.Future(), x) for x in range(10)]
    for task in tasks:
        await q.put(task)

    # wait until all futures are completed
    results = await asyncio.gather(*[f for f, _ in tasks])
    assert results == [r for _, r in tasks]

    # destroy tasks
    for c in consumers:
        c.cancel()


asyncio.get_event_loop().run_until_complete(producer())
pfreixes
  • 439
  • 2
  • 6