Objective
My objective is to consume an audio stream. Logically, this is my objective:
- Audio stream comes through WebSocket A (
FastAPI
endpoint) - Audio stream is bridged to a different WebSocket, B, which will return a JSON (Rev-ai's WebSocket)
- Json results are sent back through WebSocket A, in real-time. Thus, while the audio stream is still coming in.
Possible solution
To solve this problem, I've had quite a few ideas, but ultimately I've been trying to bridge WebSocket A
to WebSocket B
. My attempt so far involves a ConnectionManager
class, which contains a Queue.queue
. The chunks of the audio stream are added to this queue so that we do not consume directly from WebSocket A
.
The ConnectionManager
also contains a generator method to yield all values from the queue.
My FastAPI implementation consumes from websocket A
like this:
@app.websocket("/ws")
async def predict_feature(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
chunk = await websocket.receive_bytes()
manager.add_to_buffer(chunk)
except KeyboardInterrupt:
manager.disconnect()
Concurrent to this ingestion, I'd like to have a task that would bridge our audio stream to WebSocket B
, and send the obtained values to WebSocket A
. The audio stream could be consumed through the aforementioned generator
method.
The generator method is necessary due to how WebSocket B consumes messages, as shown in Rev-ai's examples:
streamclient = RevAiStreamingClient(access_token, config)
response_generator = streamclient.start(MEDIA_GENERATOR)
for response in response_generator:
# return through websocket A this value
print(response)
This is one of the biggest challenges, as we need to be consuming data into a generator and getting the results in real-time.
Latest attempts
I've been trying my luck with asyncio
; from what i'm understanding, a possibility would be to create a coroutine that would run in the background. I've been unsuccessful with this, but it sounded promising.
I've thought about triggering this through the FastAPI
startup event, but I'm having trouble achieving concurrency. I tried to use event_loops
, but it gave me a nested event loop
related error.
Caveat
FastAPI can be optional if your insight deems so, and in a way so is WebSocket A. At the end of the day, the ultimate objective is to receive an audio stream through our own API endpoint, run it through Rev.ai's WebSocket, do some extra processing, and send the results back.