3

I have a block of code which takes a long time to execute and is CPU intense. I want to run that block several times and want to use the full power of my CPU for that. Looking at asyncio I understood that it is mainly for asynchronous communication, but is also a general tool for asynchronous tasks.

In the following example the time.sleep(y) is a placeholder for the code I want to run. In this example every co-routine is executed one after the other and the execution takes about 8 seconds.

import asyncio
import logging
import time


async def _do_compute_intense_stuff(x, y, logger):
    logger.info('Getting it started...')
    for i in range(x):
        time.sleep(y)
    logger.info('Almost done')
    return x * y

logging.basicConfig(format='[%(name)s, %(levelname)s]: %(message)s', level='INFO')
logger = logging.getLogger(__name__)
loop = asyncio.get_event_loop()
co_routines = [
    asyncio.ensure_future(_do_compute_intense_stuff(2, 1, logger.getChild(str(i)))) for i in range(4)]
logger.info('Made the co-routines')
responses = loop.run_until_complete(asyncio.gather(*co_routines))
logger.info('Loop is done')
print(responses)

When I replace time.sleep(y) with asyncio.sleep(y) it returns nearly immediately. With await asyncio.sleep(y) it takes about 2 seconds.

Is there a way to parallelize my code using this approach or should I use multiprocessing or threading? Would I need to put the time.sleep(y) into a Thread?

Neil
  • 14,063
  • 3
  • 30
  • 51
Benjamin
  • 1,348
  • 2
  • 12
  • 25
  • 3
    You don't use asyncio for that. Asyncio is great when you have a problem that *waits for I/O to happen*. Intense computation is not such a problem. Use multiprocessing instead. Only use threading if you are using some C-extension-backed library that'll release the GIL when doing heavy computations. – Martijn Pieters Jul 10 '18 at 17:56
  • 2
    Asyncio also requires all your code to *cooperate*. Each `await` is a place that your task tells the event loop that it is willing for other tasks to run if they are not waiting. `time.sleep()` is the very opposite of cooperating. It blocks everything, so the event loop can't switch tasks. – Martijn Pieters Jul 10 '18 at 17:57
  • 2
    `asyncio.sleep()` produces a coroutine. If you don't await on it, it'll not do anything, so yes, you'd see an instant return. – Martijn Pieters Jul 10 '18 at 17:58
  • Thanks @MartijnPieters that clears up some confusion! – Benjamin Jul 10 '18 at 18:28

2 Answers2

5

Executors use multithreading to accomplish this (or mulitprocessing, if you prefer). Asyncio is used to optimize code where you wait frequently for input, output operations to run. Sometimes that can be writing to files or loading websites.

However, with cpu heavy operations (that don't just rely on waiting for IO), it's recommended to use something akin to threads, and, in my opinion, concurrent.futures provides a very nice wrapper for that and it is similar to Asyncio's wrapper.

The reason why Asyncio.sleep would make your code run faster because it starts the function and then starts checking coroutines to see if they are ready. This doesn't scale well with CPU-heavy operations, as there is no IO to wait for.

To change the following example from multiprocessing to multi-threading Simply change ProcessPoolExecutor to ThreadPoolExecutor.

Here is a multiprocessing example:

import concurrent.futures
import time

def a(z):
    time.sleep(1)
    return z*12

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(a, i) for i in range(5)}
        for future in concurrent.futures.as_completed(futures):
            data = future.result()
            print(data)

This is a simplified version of the example provided in the documentation for executors.

Neil
  • 14,063
  • 3
  • 30
  • 51
  • Thanks so far. It seems to work somehow. The functions are executed concurrently but not in parallel (I guess). When executing my function once it takes up a whole core and 20 seconds. When executing 2 functions it takes 1 minute and none of the cores goes to 100%. Any ideas? – Benjamin Jul 10 '18 at 18:27
  • The processor won't go to 100% because you're on a single core. Use multiprocessing if you wish to full-load the cpu. Please keep in mind that you might want to -pre stage your data prior to starting the process, as communication between queues can sometimes be tricky when the second process is running. –  Jul 10 '18 at 18:31
  • @Benjamin Updated the answer to use multiprocessing. – Neil Jul 10 '18 at 18:35
  • 1
    Coming from Java where threads run on a core each this [https://stackoverflow.com/a/3046201/5119485](question) has helped me understand the difference of processes and threads in Python. My actual code however didn't want to run in a process because I used classes there which could not be pickled. Since I don't have inter process communication I ditched the nice libs and just have a wrapper starting my script with `Popen` and reading back from stdout within a `ThreadPoolExecutor`. – Benjamin Jul 10 '18 at 21:36
  • Since your answer addresses the asked question I will accept it. – Benjamin Jul 10 '18 at 21:36
0

simple example

This example was taken from https://www.blog.pythonlibrary.org/2016/07/26/python-3-an-intro-to-asyncio/

It helped me a lot. There is also a "bad example" - this helped me even more ^^

import aiohttp
import asyncio
import async_timeout
import os

async def download_coroutine(session, url):
    with async_timeout.timeout(10):
        async with session.get(url) as response:
            filename = os.path.basename(url)
            with open(filename, 'wb') as f_handle:
                while True:
                    chunk = await response.content.read(1024)
                    if not chunk:
                        break
                    f_handle.write(chunk)
            return await response.release()

async def main(loop):
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    async with aiohttp.ClientSession(loop=loop) as session:
        tasks = [download_coroutine(session, url) for url in urls]
        await asyncio.gather(*tasks)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
Markus Dutschke
  • 9,341
  • 4
  • 63
  • 58