1

I wrote an asynchronous process in python, using a semaphore to set a limit on the number of concurrent executions-but it doesn't seem to work:

import asyncio

async def gather_with_concurrency(*tasks,limit=3):
    semaphore = asyncio.Semaphore(limit)

    async def sem_task(task):
        async with semaphore:
            return await task
    return await asyncio.gather(*(sem_task(task) for task in tasks))

async def test(second):
    print(f"{second} start")
    await asyncio.sleep(second)
    print(f"{second} done")
    return second

async def _run():
    tasks = []
    for i in range(5):
        tasks.append(asyncio.create_task(test(i+1)))

    return await gather_with_concurrency(*tasks)

def mainloop():
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(_run())

if __name__ == '__main__':
    mainloop()

The expected output is

1 start
2 start
3 start
1 end
4 start
2 end
5 start
3 end
4 end
5 end

But the actual output is

1 start
2 start
3 start
4 start
5 start
1 done
2 done
3 done
4 done
5 done

It looks like all the tasks have been executed at the same time. Maybe semaphore is not working. What is the cause of this? How can I make it work correctly?


added print after with semaphore.

    async def sem_task(task):
        async with semaphore:
            print("Starting", task)
            return await task

Looking at the output, it seems that concurrency is correctly limited. But why does print fire at the same time?

1 start
2 start
3 start
4 start
5 start
Starting <Task pending name='Task-2' coro=<test() running at stackoverflow.py:16> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe87b4ab460>()]>>
Starting <Task pending name='Task-3' coro=<test() running at stackoverflow.py:16> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe87b4ab490>()]>>
Starting <Task pending name='Task-4' coro=<test() running at stackoverflow.py:16> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe87b4ab4c0>()]>>
1 done
Starting <Task pending name='Task-5' coro=<test() running at stackoverflow.py:16> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe87b4ab4f0>()]>>
2 done
Starting <Task pending name='Task-6' coro=<test() running at stackoverflow.py:16> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe87b4ab520>()]>>
3 done
4 done
5 done

How to limit concurrency with Python asyncio?

I read this post to implement. Reading the comments, it seems that I needed to pass a coroutine to gather_with_concurrency. The following implementation worked correctly.

import asyncio

async def gather_with_concurrency(*cors,limit=3):
    semaphore = asyncio.Semaphore(limit)

    async def sem_task(cor):
        async with semaphore:
            print("Starting", cor)
            return await asyncio.create_task(cor)
    return await asyncio.gather(*(sem_task(cor) for cor in cors))

async def test(second):
    print(f"{second} start")
    await asyncio.sleep(second)
    print(f"{second} done")
    return second

async def _run():
    cors = []
    for i in range(5):
        cors.append(test(i+1))

    return await gather_with_concurrency(*cors)

def mainloop():
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(_run())

if __name__ == '__main__':
    mainloop()

When create_task is called, does it execute the code before the first await?

user20533
  • 41
  • 1
  • 4
  • 1
    I'm not too familiar with `asyncio`, but if you add `print("Starting", task)` as the first thing in the `async with semaphore:` block you can see that the semaphore is working correctly in some way at least. Hopefully that's a helpful data point. – Kemp Sep 13 '21 at 15:10
  • Thanks Kemp! it seems that concurrency is correctly limited. – user20533 Sep 13 '21 at 15:22
  • 2
    You’re starting the task before passing the unfinished coroutine into `sem_task`. If you want it to not execute at all, you need to pass the task as callable and call it in `sem_task`, at which point it will start. – deceze Sep 13 '21 at 16:13

0 Answers0