1

I am writing a Python program that run tasks taken from a queue concurrently, to learn asyncio.

Items will be put onto a queue by interacting with a main thread (within REPL). Whenever a task is put onto the queue, it should be consumed and executed immediately. My approach is to kick off a separate thread and pass a queue to the event loop within that thread.

The tasks are running but only sequentially and I am not clear on how to run the tasks concurrently. My attempt is as follows:

import asyncio
import time
import queue
import threading

def do_it(task_queue):
    '''Process tasks in the queue until the sentinel value is received'''
    _sentinel = 'STOP'

    def clock():
        return time.strftime("%X")

    async def process(name, total_time):
        status = f'{clock()} {name}_{total_time}:'
        print(status, 'START')
        current_time = time.time()
        end_time = current_time + total_time
        while current_time < end_time:
            print(status, 'processing...')
            await asyncio.sleep(1)
            current_time = time.time()
        print(status, 'DONE.')

    async def main():
        while True:
            item = task_queue.get()
            if item == _sentinel:
                break
            await asyncio.create_task(process(*item))

    print('event loop start')
    asyncio.run(main())
    print('event loop end')


if __name__ == '__main__':
    tasks = queue.Queue()
    th = threading.Thread(target=do_it, args=(tasks,))
    th.start()

    tasks.put(('abc', 5))
    tasks.put(('def', 3))

Any advice pointing me in the direction of running these tasks concurrently would be greatly appreciated!
Thanks

UPDATE
Thank you Frank Yellin and cynthi8! I have reformed main() according to your advice:

  • removed await before asyncio.create_task - fixed concurrency
  • added wait while loop so that main would not return prematurely
  • used non-blocking mode of Queue.get()

The program now works as expected

UPDATE 2
user4815162342 has offered further improvements, I have annotated his suggestions below.

'''
Starts auxiliary thread which establishes a queue and consumes tasks within a
queue.
    
Allow enqueueing of tasks from within __main__ and termination of aux thread
'''
import asyncio
import time
import threading
import functools

def do_it(started):
    '''Process tasks in the queue until the sentinel value is received'''
    _sentinel = 'STOP'

    def clock():
        return time.strftime("%X")

    async def process(name, total_time):
        print(f'{clock()} {name}_{total_time}:', 'Started.')
        current_time = time.time()
        end_time = current_time + total_time
        while current_time < end_time:
            print(f'{clock()} {name}_{total_time}:', 'Processing...')
            await asyncio.sleep(1)
            current_time = time.time()
        print(f'{clock()} {name}_{total_time}:', 'Done.')

    async def main():
        # get_running_loop() get the running event loop in the current OS thread
        # out to __main__ thread
        started.loop = asyncio.get_running_loop()
        started.queue = task_queue = asyncio.Queue()
        started.set()
        while True:
            item = await task_queue.get()
            if item == _sentinel:
                # task_done is used to tell join when the work in the queue is 
                # actually finished. A queue length of zero does not mean work
                # is complete.
                task_queue.task_done()
                break
            task = asyncio.create_task(process(*item))
            # Add a callback to be run when the Task is done.
            # Indicate that a formerly enqueued task is complete. Used by queue 
            # consumer threads. For each get() used to fetch a task, a 
            # subsequent call to task_done() tells the queue that the processing
            # on the task is complete.
            task.add_done_callback(lambda _: task_queue.task_done())            

        # keep loop going until all the work has completed
        # When the count of unfinished tasks drops to zero, join() unblocks.
        await task_queue.join()

    print('event loop start')
    asyncio.run(main())
    print('event loop end')

if __name__ == '__main__':
    # started Event is used for communication with thread th
    started = threading.Event()
    th = threading.Thread(target=do_it, args=(started,))
    th.start()
    # started.wait() blocks until started.set(), ensuring that the tasks and
    # loop variables are available from the event loop thread
    started.wait()
    tasks, loop = started.queue, started.loop

    # call_soon schedules the callback callback to be called with args arguments
    # at the next iteration of the event loop.
    # call_soon_threadsafe is required to schedule callbacks from another thread 
    
    # put_nowait enqueues items in non-blocking fashion, == put(block=False)
    loop.call_soon_threadsafe(tasks.put_nowait, ('abc', 5))
    loop.call_soon_threadsafe(tasks.put_nowait, ('def', 3))
    loop.call_soon_threadsafe(tasks.put_nowait, 'STOP')
user1330734
  • 390
  • 6
  • 21

3 Answers3

6

As others pointed out, the problem with your code is that it uses a blocking queue which halts the event loop while waiting for the next item. The problem with the proposed solution, however, is that it introduces latency because it must occasionally sleep to allow other tasks to run. In addition to introducing latency, it prevents the program from ever going to sleep, even when there are no items in the queue.

An alternative is to switch to asyncio queue which is designed for use with asyncio. This queue must be created inside the running loop, so you can't pass it to do_it, you must retrieve it. Also, since it's an asyncio primitive, its put method must be invoked through call_soon_threadsafe to ensure that the event loop notices it.

One final issue is that your main() function uses another busy loop to wait for all the tasks to complete. This can be avoided by using Queue.join, which is explicitly designed for this use case.

Here is your code adapted to incorporate all of the above suggestions, with the process function remaining unchanged from your original:

import asyncio
import time
import threading

def do_it(started):
    '''Process tasks in the queue until the sentinel value is received'''
    _sentinel = 'STOP'

    def clock():
        return time.strftime("%X")

    async def process(name, total_time):
        status = f'{clock()} {name}_{total_time}:'
        print(status, 'START')
        current_time = time.time()
        end_time = current_time + total_time
        while current_time < end_time:
            print(status, 'processing...')
            await asyncio.sleep(1)
            current_time = time.time()
        print(status, 'DONE.')

    async def main():
        started.loop = asyncio.get_running_loop()
        started.queue = task_queue = asyncio.Queue()
        started.set()
        while True:
            item = await task_queue.get()
            if item == _sentinel:
                task_queue.task_done()
                break
            task = asyncio.create_task(process(*item))
            task.add_done_callback(lambda _: task_queue.task_done())
        await task_queue.join()

    print('event loop start')
    asyncio.run(main())
    print('event loop end')

if __name__ == '__main__':
    started = threading.Event()
    th = threading.Thread(target=do_it, args=(started,))
    th.start()
    started.wait()
    tasks, loop = started.queue, started.loop

    loop.call_soon_threadsafe(tasks.put_nowait, ('abc', 5))
    loop.call_soon_threadsafe(tasks.put_nowait, ('def', 3))
    loop.call_soon_threadsafe(tasks.put_nowait, 'STOP')

Note: an unrelated issue with your code was that it awaited the result of create_task(), which nullified the usefulness of create_task() because it wasn't allowed to run in the background. (It would be equivalent to immediately joining a thread you've just started - you can do it, but it doesn't make much sense.) This issue is fixed both in the above code and in your edit to the question.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • Thanks for your answer, which is much better than mine. I was trying to make minimal changes to the code, rather than re-writing from scratch. – Frank Yellin Sep 12 '20 at 21:07
  • @FrankYellin Thanks. In normal circumstances I'd support minimal changes to OP's code, but in this case I think the non-blocking get approach is deeply flawed (although it does provide a quick fix). I wish it were easier to provide the queue and the loop from inside `main()`, the biggest change is basically smuggling them out. – user4815162342 Sep 12 '20 at 21:25
  • Thank you for improving the code @user4815162342! I am now annotating the code to comment in explanations and will update my post. Only the `lambda` statement is not so clear to me; do you use this construct to swallow an argument to the `task_done` method to fit the `add_done_callback` signature? – user1330734 Sep 13 '20 at 23:58
  • @user1330734 Yes, it's a typical use for `lambda`. I could have just sent `task_queue` to `process` and have it call `task_queue.task_done()` when finished - but that would complicate `process()` slightly and make it seem like `process` needs the task queue (which it otherwise doesn't). Another option would be to send `task_queue.task_done` (note lack of parentheses) to `process` as a callback to invoke when done, [like this](https://pastebin.com/pyc5aEKW), but it would still require changing `process()` which I wanted to avoid as I'd already made changes to most of the rest of the code. – user4815162342 Sep 14 '20 at 07:05
  • This code still has a problem that if queue is full then sentinel won't be read for an unpredictable amount of time. So no way to stop it reliably. – Suor Aug 30 '21 at 11:08
  • @Suor This queue is unbounded, so it's never full. Also, in this code `STOP` marks end of processing, not request for immediate cancellation. – user4815162342 Aug 30 '21 at 14:07
2

There are two problems with your code.

First, you should not have the await before the asyncio.create_task. This is possibly what is causing your code to run synchronously.

Then, once you've made your code run asynchronously, you need something after the while loop in main so that the code doesn't return immediately, but instead waits for all the jobs to finish. Another stackoverflow answer recommends:

while len(asyncio.Task.all_tasks()) > 1:  # Any task besides main() itself?
    await asyncio.sleep(0.2)

Alternatively there are versions of Queue that can keep track of running tasks.

Frank Yellin
  • 9,127
  • 1
  • 12
  • 22
1

As an additional problem:

If a queue.Queue is empty, get() blocks by default and does not return a sentinel string. https://docs.python.org/3/library/queue.html

cynthi8
  • 801
  • 3
  • 10