1

I know that asyncio features heavily in StackOverflow, but despite the many questions answered here, I still don't understand how to do something as simple as parallelise 2 tasks that execute blocking code.

For example, this works beautifully:

import asyncio


async def slow_thing():
    await asyncio.sleep(2)


async def try_alpha():
    print("Alpha start")
    await slow_thing()
    print("Alpha stop")
    return "Alpha"


async def try_bravo():
    print("Bravo start")
    await slow_thing()
    print("Bravo stop")
    return "Bravo"


async def main():
    futures = [
        try_alpha(),
        try_bravo(),
    ]
    for response in await asyncio.gather(*futures):
        print(response)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

The output is exactly what I'm looking for:

Alpha start
Bravo start
*2 second wait*
Alpha stop
Bravo stop
Alpha
Bravo

However, if I swap out await syncio.sleep(2) with time.sleep(2), the output is as if there's nothing async about my code:

Alpha start
*2 second wait*
Alpha stop
Bravo start
*2 second wait*
Bravo stop
Alpha
Bravo

The thing is, in my real-world example, I don't control that slow code so I can't change it to use coroutines. In some cases, it's just a bunch of uses of requests.get() and in others I'm using the kodijson library which does a bunch of things I don't have access to.

So I'm left wondering if asyncio is even the right tool here. Is it possible to use blocking code inside async code when you're trying to parallelise with .gather()?

Also note that I'm (unfortunately) stuck with Python 3.6 on this one. I'm writing a Mycroft extension, and that's the environment they're stuck on at the moment.

Daniel Quinn
  • 6,010
  • 6
  • 38
  • 61
  • 1
    For blocking calls, use [`loop.run_in_executor`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) – dano Jun 01 '20 at 20:54
  • Also note that, if you don't have a bunch of code which can take advantage of asynchronous I/O on top of `asyncio`, it is probably not the right choice for your application, and you should just use the `threading`/`multiprocessing`/`concurrent.futures` directly. – dano Jun 01 '20 at 20:58
  • I replaced the `sleep` line with `loop.run_in_executor(None, time.sleep, 2)` and it *sort of* works. It waits where it should, but also prints out the error: `RuntimeError: Event loop is closed` – Daniel Quinn Jun 01 '20 at 21:00
  • But your suggestion of using `concurrent.futures` looks like it might do the job for me, thanks! – Daniel Quinn Jun 01 '20 at 21:10
  • Take a look at [this answer](https://stackoverflow.com/a/55130187/1113207): depending on a nature of the task you have a several options. But if you want asyncio and not to rewrite, then, yes, `run_in_executor` is a way to go. – Mikhail Gerasimov Jun 02 '20 at 08:34
  • 1
    [This answer](https://stackoverflow.com/a/58148249/1600898) shows how to use `concurrent.futures` directly for this kind of task. – user4815162342 Jun 02 '20 at 13:51

2 Answers2

0

Coroutines can do stuff in "parallel" only when there's stuff to wait on. For example in your code above what makes it work with asyncio.sleep is the fact you can await it on it. And you can await only on specific functions that have been designed for that purpose. That's why the standard time.sleep doesn't work because you can't use the keyword await with it. Same thing for the requests library.

Luckily you can use the wonderful aiohttp library: https://docs.aiohttp.org that will provide to you exactly what you need to make multiple requests simultaneously.

Eleveres
  • 39
  • 4
  • 1
    This answer is correct, but the OP was explicit that they don't control the "slow" code, i.e. it can't just be ported from `requests` to `aiohttp`. As noted in the comments, this can be fixed using `run_in_executor`, but then one might as well use `concurrent.futures` directly. – user4815162342 Jun 02 '20 at 08:29
0

After the help I received here in the form of comments, I was able to roll together a solution using concurrent.futures:

import concurrent.futures
import time


def slow_1(s):
    time.sleep(5)
    print(f"1: {s}")
    return "1: ok"


def slow_2(s):
    time.sleep(1)
    print(f"2: {s}")
    return "2: ok"


def slow_3(s):
    time.sleep(1)
    print(f"3: {s}")
    return "3: ok"


with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = (
        executor.submit(slow_1, "x"),
        executor.submit(slow_2, "y"),
        executor.submit(slow_3, "z"),
    )
    concurrent.futures.wait(futures)
    for future in futures:
        try:
            print(future.result())
        except:  # This should obviously be more explicit
            pass

Which outputs:

2: y
3: z
1: x
1: ok
2: ok
3: ok

I should note that it wasn't clear for the official documentation that you can get the returned values from the functions by calling .result() on the future, or that you need to loop over the futures value to get said results. .wait() returns a tuple of done and not_done values in the order in which they returned, so looping over the value of done broke a lot of things for me. If you're like me and just wanted to do 3 slow things at once and get the results from those three things, this code will probably work for you.

Daniel Quinn
  • 6,010
  • 6
  • 38
  • 61