I wrote an asynchronous process in python, using a semaphore to set a limit on the number of concurrent executions-but it doesn't seem to work:
import asyncio
async def gather_with_concurrency(*tasks,limit=3):
semaphore = asyncio.Semaphore(limit)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
async def test(second):
print(f"{second} start")
await asyncio.sleep(second)
print(f"{second} done")
return second
async def _run():
tasks = []
for i in range(5):
tasks.append(asyncio.create_task(test(i+1)))
return await gather_with_concurrency(*tasks)
def mainloop():
loop = asyncio.get_event_loop()
results = loop.run_until_complete(_run())
if __name__ == '__main__':
mainloop()
The expected output is
1 start
2 start
3 start
1 end
4 start
2 end
5 start
3 end
4 end
5 end
But the actual output is
1 start
2 start
3 start
4 start
5 start
1 done
2 done
3 done
4 done
5 done
It looks like all the tasks have been executed at the same time. Maybe semaphore is not working. What is the cause of this? How can I make it work correctly?
added print after with semaphore.
async def sem_task(task):
async with semaphore:
print("Starting", task)
return await task
Looking at the output, it seems that concurrency is correctly limited. But why does print fire at the same time?
1 start
2 start
3 start
4 start
5 start
Starting <Task pending name='Task-2' coro=<test() running at stackoverflow.py:16> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe87b4ab460>()]>>
Starting <Task pending name='Task-3' coro=<test() running at stackoverflow.py:16> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe87b4ab490>()]>>
Starting <Task pending name='Task-4' coro=<test() running at stackoverflow.py:16> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe87b4ab4c0>()]>>
1 done
Starting <Task pending name='Task-5' coro=<test() running at stackoverflow.py:16> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe87b4ab4f0>()]>>
2 done
Starting <Task pending name='Task-6' coro=<test() running at stackoverflow.py:16> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe87b4ab520>()]>>
3 done
4 done
5 done
How to limit concurrency with Python asyncio?
I read this post to implement. Reading the comments, it seems that I needed to pass a coroutine to gather_with_concurrency. The following implementation worked correctly.
import asyncio
async def gather_with_concurrency(*cors,limit=3):
semaphore = asyncio.Semaphore(limit)
async def sem_task(cor):
async with semaphore:
print("Starting", cor)
return await asyncio.create_task(cor)
return await asyncio.gather(*(sem_task(cor) for cor in cors))
async def test(second):
print(f"{second} start")
await asyncio.sleep(second)
print(f"{second} done")
return second
async def _run():
cors = []
for i in range(5):
cors.append(test(i+1))
return await gather_with_concurrency(*cors)
def mainloop():
loop = asyncio.get_event_loop()
results = loop.run_until_complete(_run())
if __name__ == '__main__':
mainloop()
When create_task is called, does it execute the code before the first await?