5

How to make asyncio using all cpu cores - any other options than ProcessPoolExecutor?

I assume that asyncio can not break GIL limit (maybe I am wrong) so programs will be executed faster than treading version but will on one core.

I study some examples and I found that one way to do it is multiprocessing and ProcessPoolExecutor.

https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
    result = await loop.run_in_executor(
        pool, cpu_bound)
    print('custom process pool', result)

That is nice but need "pickle" between processes so some overhead is required and some optimization of passed arguments to reduce "pickle" serialization.

Using this simple pattern above I wrote such test code (you can skip this code reading if you do not like it since it is same as before). BTW this the fastest solution of my problem with parsing files. This part of code not whole program.

def _match_general_and_specific_file_chunk(file_name):
    with codecs.open(file_name, encoding='utf8') as f:
        while True:
            lines = f.readlines(sizehint=10000)
            if not lines:
                break
            for line in lines:
                general_match = RE_RULES.match(line)
                if general_match:
                    specific_match = RULES[general_match.lastindex].match(line)
                    groups = list(specific_match.groups())
                    continue


async def _async_process_executor_match_general_and_specific_read_lines_with_limit_file_chunk():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        futures = []
        for file_name in get_file_names():
            future = loop.run_in_executor(pool, _match_general_and_specific_file_chunk, file_name)
            futures.append(future)
        await asyncio.gather(*futures)


def async_process_executor_match_general_and_specific_read_lines_with_limit_file_chunk():
    asyncio.run(_async_process_executor_match_general_and_specific_read_lines_with_limit_file_chunk())
Chameleon
  • 9,722
  • 16
  • 65
  • 127
  • Asyncio will mostly only help parallelizing IO operations, yeah. That lines/regex loop is still mostly pure Python, so it won't parallelize within the same process. – AKX Aug 05 '19 at 12:58

1 Answers1

15

How to make asyncio using all cpu cores - any other options than ProcessPoolExecutor?

Asyncio is the wrong tool for the job because it is specifically designed for managing states of IO-bound programs (you can think of it as a successor to Twisted).

To execute CPU-bound code in parallel, you will need OS-level concurrency provided by threads or processes. In Python the most convenient way is the concurrent.futures module, where classes like ThreadPoolExecutor and ProcessPoolExecutor come from in the first place. You only need to submit() the pieces of work to the executor and wait for the resulting futures to complete.

If you want to avoid the overhead of pickling, there are two approaches:

  • Use processes, and make use of shared memory, mapped memory, or manager object to share state between the controlling process and the workers.

  • Use threads, but invoke the code that internally releases the GIL while doing CPU-intensive work. Some packages already do so, e.g. the hashlib std module or many parts of numpy, but that won't help you if you have needs not covered by those packages. In that case you might need to write a new C extension, referring to the documentation for details on how to temporarily release the GIL and when it is safe to do so.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • OP's code will also probably benefit from multiprocessing's forking, since `RE_RULES` and `RULES` look like global constants that will get mapped directly into the child processes when they fork off. – AKX Aug 05 '19 at 12:59
  • @AKX Hopefully the speedup will occur even in the spawning mode because global variables like `RULES` will be built only once, and the processes are reused by the pool. The issue in the OP's case is likely that the matching operation is lightweight, so it might be more expensive to pickle and unpickle the _request_ to the worker process than to just do the work. Optimizing such scenarios can take a lot of patience and trying out different approaches, such as batching the lines, or hand-coding specialized communication with the workers. – user4815162342 Aug 05 '19 at 13:24
  • Sure, even spawning mode will help, but each process will then have to build/load the objects on its own. – AKX Aug 05 '19 at 13:47
  • 1
    @AKX Yes, but only once per worker. That should be negligible compared to the number of items each worker process processes throughout its lifetime. – user4815162342 Aug 05 '19 at 13:50
  • Initialization of globals is not problem comparing to number of worker inputs and outputs. I think from practical point of view that passing `RULE`, `RE_RULE` will overoptimization. I will focus first on i/o for worker since it will 1e6 lines to parse and 1/100 of them will be result. – Chameleon Aug 06 '19 at 11:02