7

I'm having a weird issue with asyncio.Queue - instead of returning an item as soon as it available, the queue waits until it is full before returning anything. I realized that while using a queue to store frames collected from cv2.VideoCapture, the larger the maxsize of the queue was, the longer it took to show anything on screen, and then, it looked like a sequence of all the frames collected into the queue.
Is that a feature, a bug, or am i just using this wrong?
Anyway, here is my code

import asyncio  
import cv2  
import numpy as np  


async def collecting_loop(queue):  
    print("cl")  
    cap = cv2.VideoCapture(0)  
    while True:  
        _, img = cap.read()  
        await queue.put(img)  


async def processing_loop(queue):  
    print("pl")  
    await asyncio.sleep(0.1)  
    while True:  
        img = await queue.get()  
        cv2.imshow('img', img)  
        cv2.waitKey(5)  


async def main(e_loop):  
    print("running main")  
    queue = asyncio.Queue(loop=e_loop, maxsize=10)
    await asyncio.gather(collecting_loop(queue), processing_loop(queue))


loop = asyncio.get_event_loop()   
try:   
    loop.run_until_complete(main(e_loop=loop))   
except KeyboardInterrupt:   
    pass   
finally:   
    loop.close()   
Jeru Luke
  • 20,118
  • 13
  • 80
  • 87

2 Answers2

6

Is [the queue getter not waking up until the queue fills up] a feature, a bug, or am i just using this wrong?

You're using it wrong, but in a subtle way. As Andrew explained, queue.put doesn't guarantee a task switch, and the collector coroutine only runs blocking code and queue.put. Although the blockade is short, asyncio doesn't know that and thinks you are invoking queue.put in a really tight loop. The queue getters simply don't get a chance to run until the queue fills up.

The correct way to integrate asyncio and cv is to run the cv code in a separate thread and have the asyncio event loop wait for it to finish. The run_in_executor method makes that really simple:

async def collecting_loop(queue):  
    print("cl")  
    loop = asyncio.get_event_loop()
    cap = cv2.VideoCapture(0)  
    while True:  
        _, img = await loop.run_in_executor(None, cap.read)
        await queue.put(img)

run_in_executor will automatically suspend the collector coroutine while waiting for a new frame, allowing for the queued frame(s) to be processed in time.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
5

The problem is that await q.put() doesn't switch to another task every call. Actually it does only when inserting a new value is suspended by queue-full state transition.

Inserting await asyncio.sleep(0) forces task switch. Like in multithreaded code file.read() doesn't enforce OS thread switching but time.sleep(0) does.

Misunderstandings like this are pretty common for newbies, I've discussed very similar problem yesterday, see github issue.

P.S.

Your code has much worse problem actually: you call blocking synchronous code from async function, it just is not how asyncio works.

If no asynchronous OpenCV API exists (yet) you should run OpenCV functions in a separate thread.

Already mentioned janus can help with passing data between sync and async code.

Andrew Svetlov
  • 16,730
  • 8
  • 66
  • 69
  • "The problem is that `await q.put()` **doesn't switch** to another task every call." This sounds almost like a bug - couldn't `put()` suspend when it detects that there are waiters? Or is this the case of correctly written code not needing to rely on exactly when the switch happens? – user4815162342 Mar 17 '18 at 09:52
  • 1
    I don't think this is a bug. You propose *alternative* strategy by prioritizing getters to putters. Viable solution but `asyncio.Queue` uses another approach: put everything until the queue is not full. This way is simple from both implementation **and** teaching perspective. People should learn that `await` doesn't force *context switch* anyway -- that's how asyncio works (not only queues but all awaits). – Andrew Svetlov Mar 17 '18 at 11:07
  • 1
    You're right. To even observe this issue, you must either fill the queue really fast - in which case it will soon become full anyway - or call synchronous code in the same loop, as the OP does with `cap.read()`. The latter is not correct usage of asyncio, even if it appears to work in simple scenarios. I wrote an answer recommending invocation of `cap.read` with `loop.run_in_executor`, which will ensure the needed switch. – user4815162342 Mar 17 '18 at 11:15
  • 1
    [Here](https://stackoverflow.com/a/48816319/1600898) is another baffling example of the `await`-not-guaranteed-to-switch confusion - even explicitly calling into IO doesn't guarantee that a switch will happen. – user4815162342 Mar 17 '18 at 11:22
  • While searching for other examples that use OpenCV with asyncio I found [this (first code block)](http://deklund.com/blog/2015/12/3/asynchronous-constraint-of-memory-allocation). It shows a similar flaw, right? Even though it uses `await asyncio.sleep(0)` the `cap.read()` is running in the main thread. And it seems to pile up tasks, never gathering them. – Joe Feb 06 '21 at 12:56
  • `cap.read()` is the blocking call, it should be used in asyncio thread pool – Andrew Svetlov Feb 12 '21 at 12:09