I have a FastAPI backend with a POST endpoint that response with a streaming response and I need to wrap the call to the endpoint in a way that I can use the response as a normal (not async) iterator. I try to achieve this by wrapping the call with asyncio's run_until_complete
in a seperate thread and providing a queue to be filled, so that the main thread can pull out the elements of the response as they appear in the queue.
When doing so the queue seems to be filled just fine, however when pulling the elements from the queue and yielding them I get RuntimeError: Task got bad yield 'something'
. I hope the idea is clear. A minimal example of the code:
import queue
import httpx
from threading import Thread
import asyncio
def process_stream(input):
q = queue.Queue()
job_done = object()
def task():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(process_stream_async(input, q))
loop.close()
q.put(job_done)
t = Thread(target=task)
t.start()
while True:
try:
next_token = q.get(True, timeout=1)
if next_token is job_done:
break
yield next_token # <-- here I get the error
except queue.Empty:
continue
async def process_stream_async(input, q):
payload = {
"input": input
}
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream("POST", url=BACKEND + "/process-stream", json=payload) as stream:
async for item in stream.aiter_raw():
q.put(item)
In another place I then want to use process_stream
as a generator like so:
for token in process_stream("some input"):
print(token)
I already did some extensive google and SO searches and couldn't find anything that helps me solve my problem. Or maybe I'm just stupid and/or blind. Relevant results with an accepted answer are:
Python asyncio task got bad yield
I hope the problem is clear.
Thanks in advance.