I will give an example of a simple asynchronous code
import asyncio
import time
async def do_asynchronous(function, items, chunk_size=10, task_timeout=None):
items_slices = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
result = list()
for row in items_slices:
tasks = [asyncio.wait_for(function(i), timeout=task_timeout) for i in row]
result += await asyncio.gather(*tasks, return_exceptions=True)
return result
async def foo(item):
if item != 10:
await asyncio.sleep(1)
return item
else:
await asyncio.sleep(5)
return 'oops'
if __name__ == '__main__':
loop = asyncio.get_event_loop()
items = list(range(20))
start = time.time()
result = loop.run_until_complete(do_asynchronous(foo, items, chunk_size=5, task_timeout=1.2))
print(f'{time.time() - start}')
print(result)
we get the following result:
time took: 4.2078893184661865
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, TimeoutError(), 11, 12, 13, 14, 15, 16, 17, 18, 19]
but if you change foo function a bit like this:
async def foo(item):
if item!=10:
await asyncio.sleep(1)
return item
else:
time.sleep(5)
return 'oops'
the result of executing the code will be as follows
time took: 8.01030683517456
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'oops', TimeoutError(), TimeoutError(), TimeoutError(), TimeoutError(), 15, 16, 17, 18, 19]
In the first case, the code worked as I expected. When the timeout is exceeded in one of the tasks, a TimeoutError is raised, and the other tasks complete correctly. But in the second case, for some reason, the code waits for 5 seconds and all 5 tasks that were sent to gather end with TimeoutError. Why is this happening and how to achieve the first result in the second case? That is, how can I cancel only one blocking task by timeout?