0

I have a speech to text client set up as follows. Client sends audio packets to server and server returns the text results back, which client prints on stdout.

async def record_audio(websocket):
    # Audio recording parameters
    rate = 16000
    chunk = int(rate / 10)  # 100ms

    with BufferedMicrophoneStream(rate, chunk) as stream:
        audio_generator = stream.generator() # Buffer is a asynchronous queue instance
        for message in audio_generator:
            await websocket.send(message)


async def collect_results(websocket):
    async for message in websocket:
        print(message)


async def combine():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        await asyncio.wait([
            record_audio(websocket),
            collect_results(websocket)
        ])


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(combine())
    loop.close()

As you can see, both the coroutines are infinite loops. When I run the program, the server runs correctly for either of the waited coroutines, that is, if I only pass the record_audio or collect_results, I am able to confirm that they work individually, but not simultaneously.

However, if I put a asyncio.sleep(10) statement in record_audio inside the loop, then I do see output from collect_results and audio chunks are sent to server in a burst every 10th second.

What gives?

Thanks.

Update #1:

I replaced the above code with the following, still no avail:

async with websockets.connect(uri) as websocket:
    futures = [
        await loop.run_in_executor(executor, record_audio, websocket),
        await loop.run_in_executor(executor, collect_results, websocket)
    ]
    await asyncio.gather(*futures)
abhinavkulkarni
  • 2,284
  • 4
  • 36
  • 54
  • 1
    What does iterating over `audio_generator` do? Does a step of this iteration block? Your comment says that it's an "asynchronous queue", but the code doesn't use `async for` to iterate over it, it uses ordinary for. – user4815162342 Jun 05 '20 at 12:31
  • 1
    Also, `await asyncio.wait([x, y])` by itself is an anti-pattern. You probably want to use `await asyncio.gather(x, y)`. The `asyncio.wait` function is meant for the caller to inspect the returned futures in order to extract results or exceptions out of them. Failure to do so will result in silently swallowed exceptions. – user4815162342 Jun 05 '20 at 12:33
  • @user4815162342: Thank you very much. It is indeed a thread-safe queue, not an asynchronous one! That being said, why is this still not working? I can surely call sync code from async code, right? – abhinavkulkarni Jun 05 '20 at 18:45
  • You can't call blocking code from async, at least not without wrapping it in `run_in_executor`, and then it will use additional helper threads under the hood. – user4815162342 Jun 06 '20 at 08:05
  • For such integration, take a look at the approach in [this answer](https://stackoverflow.com/a/56280107/1600898). – user4815162342 Jun 06 '20 at 08:09
  • @user4815162342: I wrapped up both the coroutines inside `run_in_executor`, but still no avail (please see the update on my post). Again, they both work individually. What gives? – abhinavkulkarni Jun 07 '20 at 19:33
  • You cannot wrap a coroutine in `run_in_executor`, it expects a **synchronous** function. The idea is to wrap just the blocking code in `run_in_executor` and await it inside your coroutines. You invoked `run_in_executor` on coroutines, which just instantiated them. You immediately awaited the results (in sequence) and got two coroutine objects, which you pass to `gather()`, exactly as before. As written, your updated code is exactly equivalent to the original. – user4815162342 Jun 07 '20 at 19:48

0 Answers0