3

I have got a funcion that generates tasks (io bound tasks):

def get_task():
    while True:
        new_task = _get_task()
        if new_task is not None:
            yield new_task
        else:
            sleep(1)

And I am trying to write a consumer in asyncio that will be processing max 10 tasks at the time and one task is finished then will take new one. I am not sure if I should use semaphores or is there any kind of asycio pool executor? I started to write a pseudocode with threads:

def run(self)
   while True:
       self.semaphore.acquire() # first acquire, then get task
       t = get_task()
       self.process_task(t)

def process_task(self, task):
   try:
       self.execute_task(task)
       self.mark_as_done(task)
   except:
       self.mark_as_failed(task)
   self.semaphore.release()

Could anyone help me? I have no clue where to put async/await keywords

Alicja Głowacka
  • 401
  • 5
  • 11

3 Answers3

6

Simple task cap using asyncio.Sepmaphore

async def max10(task_generator):
    semaphore = asyncio.Semaphore(10)

    async def bounded(task):
        async with semaphore:
            return await task

    async for task in task_generator:
        asyncio.ensure_future(bounded(task))

The problem with this solution is that tasks are being drawn from the generator greedily. For example, if generator reads from a large database, the program could run out of memory.

Other than that it's idiomatic and well-behaved.

A solution, that uses async generator protocol to pull new tasks on demand:

async def max10(task_generator):
    tasks = set()
    gen = task_generator.__aiter__()
    try:
        while True:
            while len(tasks) < 10:
                tasks.add(await gen.__anext__())
            _done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    except StopAsyncIteration:
        await asyncio.gather(*tasks)

It may be considered sub-optimal, because it doesn't start executing tasks until 10 are available.

And here's concise and magic solution using worker pattern:

async def max10(task_generator):
    async def worker():
        async for task in task_generator:
            await task

    await asyncio.gather(*[worker() for i in range(10)])

It relies on a somewhat counter-intuitive property of being able to have multiple async iterators over the same async generator, in which case each generated item is seen by only one iterator.

My gut tells me that none of these solutions behaves properly on cancellation.

Dima Tisnek
  • 11,241
  • 4
  • 68
  • 120
  • I like the didactic approach of this answer, but the last snippet could be much simpler. Since you have a fixed number of workers, you can get rid of the semaphore. And without the semaphore, workers can use the ordinary `async for` loop. – user4815162342 Mar 04 '19 at 22:09
  • Thanks, edited. Also added an explanation why that even works :) – Dima Tisnek Mar 05 '19 at 07:44
  • Note that [`ensure_future` was soft-deprecated in 3.7](https://docs.python.org/3/library/asyncio-task.html#creating-tasks) in favour of the friendlier `asyncio.create_task`. – Mike 'Pomax' Kamermans May 18 '21 at 23:26
1

Async isn't threads. If for example you have tasks that are file IO bound then write them async using aiofiles

async with aiofiles.open('filename', mode='r') as f:
    contents = await f.read()

Then replace task with your tasks. If you want to only run 10 at a time await asyncio.gather every 10 tasks.

import asyncio

async def task(x):
  await asyncio.sleep(0.5)
  print( x, "is done" )

async def run(loop):
  futs = []
  for x in range(50):
    futs.append( task(x) )

  await asyncio.gather( *futs )

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()

If you can't write the tasks async and need threads this is a basic example using asyncio's ThreadPoolExecutor. Note that with max_workers=5 only 5 tasks are run at a time.

import time
from concurrent.futures import ThreadPoolExecutor
import asyncio

def blocking(x):
  time.sleep(1)
  print( x, "is done" )

async def run(loop):
  futs = []
  executor = ThreadPoolExecutor(max_workers=5)
  for x in range(15):
    future = loop.run_in_executor(executor, blocking, x)
    futs.append( future )

  await asyncio.sleep(4)
  res = await asyncio.gather( *futs )

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()
MarkReedZ
  • 1,421
  • 4
  • 10
  • Thanks for reply. The thing is that I do not want to do 10 by 10 but start next task when any one task finished. So at the begin I start 10 tasks, one finished and I add next one to the pool. So that there is 10 tasks being processed at the time all the time – Alicja Głowacka Mar 03 '19 at 17:57
  • but the problem in your solution is that you fetch new task when you havent acquired a semaphore yet, what causes problems in my case – Alicja Głowacka Mar 04 '19 at 12:45
1

As pointed out by Dima Tismek, using semaphores to limit concurrency is vulnerable to exhausting task_generator too eagerly, since there is no backpressure between obtaining the tasks and submitting them to the event loop. A better option, also explored by the other answer, is not to spawn a task as soon as the generator has produced an item, but to create a fixed number of workers that exhaust the generator concurrently.

There are two areas where the code could be improved:

  • there is no need for a semaphore - it is superfluous when the number of tasks is fixed to begin with;
  • handling cancellation of generated tasks and of the throttling task.

Here is an implementation that tackles both issues:

async def throttle(task_generator, max_tasks):
    it = task_generator.__aiter__()
    cancelled = False
    async def worker():
        async for task in it:
            try:
                await task
            except asyncio.CancelledError:
                # If a generated task is canceled, let its worker
                # proceed with other tasks - except if it's the
                # outer coroutine that is cancelling us.
                if cancelled:
                    raise
            # other exceptions are propagated to the caller
    worker_tasks = [asyncio.create_task(worker())
                    for i in range(max_tasks)]
    try:
        await asyncio.gather(*worker_tasks)
    except:
        # In case of exception in one worker, or in case we're
        # being cancelled, cancel all workers and propagate the
        # exception.
        cancelled = True
        for t in worker_tasks:
            t.cancel()
        raise

A simple test case:

async def mock_task(num):
    print('running', num)
    await asyncio.sleep(random.uniform(1, 5))
    print('done', num)

async def mock_gen():
    tnum = 0
    while True:
        await asyncio.sleep(.1 * random.random())
        print('generating', tnum)
        yield asyncio.create_task(mock_task(tnum))
        tnum += 1

if __name__ == '__main__':
    asyncio.run(throttle(mock_gen(), 3))
user4815162342
  • 141,790
  • 18
  • 296
  • 355