24

I already wrote my script using asyncio but found that the number of coroutines running simultaneously is too large and it often ends up hanging around.

So I would like to limit the number of coroutines concurrently, and once it reaches the limit, I want to wait for any coroutine to be finished before another is executed.

My current code is something like the following:

loop = asyncio.get_event_loop()
p = map(my_func, players)
result = loop.run_until_complete(asyncio.gather(*p))

async def my_func(player):
    # something done with `await`

The players is of type list and contains many elements (say, 12000). It needs so much computational resource to run all of them simultaneously in asyncio.gather(*p) so I would rather like the number of players run simultaneously to be 200. Once it reaches 199, then I wish another coroutine starts to be executed.

Is this possible in asyncio?

Blaszard
  • 30,954
  • 51
  • 153
  • 233
  • Maybe the [asyncio queueing](https://docs.python.org/3/library/asyncio-queue.html) library could be of some use to you? – castis May 12 '18 at 17:16
  • 4
    Coroutines don't actually run simultaneously, if that's what you're thinking. They take turns. – user2357112 May 12 '18 at 17:18
  • @castis Thanks and let me check it out... – Blaszard May 12 '18 at 17:19
  • Does this answer your question? [How to limit concurrency with Python asyncio?](https://stackoverflow.com/questions/48483348/how-to-limit-concurrency-with-python-asyncio) – bernie May 20 '20 at 16:01

3 Answers3

16

I can suggest using asyncio.BoundedSemaphore.

import asyncio

async def my_func(player, asyncio_semaphore):
    async with asyncio_semaphore:
        # do stuff

async def main():
    asyncio_semaphore = asyncio.BoundedSemaphore(200)
    jobs = []
    for i in range(12000):
        jobs.append(asyncio.ensure_future(my_func(players[i], asyncio_semaphore)))
    await asyncio.gather(*jobs)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    loop.run_until_complete(main())

This way, only 200 concurrent tasks can acquire semaphore and use system resources while 12000 tasks are at hand.

Ali Yılmaz
  • 1,657
  • 1
  • 11
  • 28
  • 17
    Note that you don't need a `BoundedSemaphore` - an ordinary `Semaphore(200)` will have the same effect. A `BoundedSemaphore` serves a different purpose - it is designed to differ from ordinary `Semaphore` by raising an exception (instead of blocking) when the semaphore is _released_ more times than it was acquired. That cannot happen when it is only acquired/released using `with`. – user4815162342 May 13 '18 at 08:02
  • 1
    @user4815162342 thanks for the info! I'll check it out. – Ali Yılmaz May 14 '18 at 12:32
  • Worked great for me, Thanks a lot. – GraphicalDot Sep 12 '20 at 06:10
11

You can wrap your gather and enforce a Semaphore:

import asyncio

async def semaphore_gather(num, coros, return_exceptions=False):
    semaphore = asyncio.Semaphore(num)

    async def _wrap_coro(coro):
        async with semaphore:
            return await coro

    return await asyncio.gather(
        *(_wrap_coro(coro) for coro in coros), return_exceptions=return_exceptions
    )

# async def a():
#     return 1

# print(asyncio.run(semaphore_gather(10, [a() for _ in range(100)])))
# [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
ddelange
  • 1,037
  • 10
  • 24
  • 1
    That's a great option, thanks for putting out an option that doesn't require going into all of my coros. – Yablargo Apr 29 '21 at 02:06
  • FYI, I've more recently come across [`aioitertools.asyncio.gather`](https://github.com/omnilib/aioitertools/blob/v0.7.1/aioitertools/asyncio.py#L71-L76), allowing to limit concurrency in a less expensive manner. Use with caution though, as it's a rather custom implementation. – ddelange Apr 30 '21 at 11:57
4

You might want to consider using aiostream.stream.map with the task_limit argument:

from aiostream import stream, pipe

async def main():
    xs = stream.iterate(players)
    ys = stream.map(xs, my_func, task_limit=100)
    zs = stream.list(ys)
    results = await zs

Same approach using pipes:

async def main():
    results = await (
        stream.iterate(players) | 
        pipe.map(my_func, task_limit=100) |
        pipe.list())

See the aiostream project page and the documentation for further information.

Disclaimer: I am the project maintainer.

Vincent
  • 12,919
  • 1
  • 42
  • 64