0

I have a FastAPI backend with a POST endpoint that response with a streaming response and I need to wrap the call to the endpoint in a way that I can use the response as a normal (not async) iterator. I try to achieve this by wrapping the call with asyncio's run_until_complete in a seperate thread and providing a queue to be filled, so that the main thread can pull out the elements of the response as they appear in the queue.

When doing so the queue seems to be filled just fine, however when pulling the elements from the queue and yielding them I get RuntimeError: Task got bad yield 'something'. I hope the idea is clear. A minimal example of the code:

import queue
import httpx
from threading import Thread
import asyncio

def process_stream(input):
    q = queue.Queue()

    job_done = object()

    def task():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(process_stream_async(input, q))
        loop.close()
        q.put(job_done)

    t = Thread(target=task)
    t.start()

    while True:
        try:
            next_token = q.get(True, timeout=1)
            if next_token is job_done:
                break
            yield next_token # <-- here I get the error
        except queue.Empty:
            continue


async def process_stream_async(input, q):
    payload = {
        "input": input
    }

    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream("POST", url=BACKEND + "/process-stream", json=payload) as stream:
            async for item in stream.aiter_raw():
                q.put(item)
    

In another place I then want to use process_stream as a generator like so:

for token in process_stream("some input"):
    print(token)

I already did some extensive google and SO searches and couldn't find anything that helps me solve my problem. Or maybe I'm just stupid and/or blind. Relevant results with an accepted answer are:

Python asyncio task got bad yield

python: Task got bad yield:

I hope the problem is clear.

Thanks in advance.

thomas
  • 624
  • 1
  • 12
  • 27
  • Note the word "task" in the error message "Task got bad yield". That message is from the asyncio module, but the function `process_stream` is not an async def function. How is it raising this particular error message? We can't tell from the code you provided. You're mixing async with non-async code in some weird way, which was also the problem in those two links provided. You need to show us more, or take a careful look at the bigger structure of the program. – Paul Cornelius Jul 21 '23 at 11:17

1 Answers1

0

IIRC, this "Task got bad yield" error is raised by an asycnio event-loop when driving a co-routine, and instead of an "await" or "yield from (other awaitable)" one does an ordinary yield. That means your process_stream generator is being treated as a coroutne/awaitable from some component in your larger system.

jsbueno
  • 99,910
  • 10
  • 151
  • 209