3

My code looks like this:

TDSession = TDClient()
TDSession.grab_refresh_token()
q = queue.Queue(10)
asyncio.run(listener.startStreaming(TDSession, q))
while True:
    message = q.get()
    print('oh shoot!')
    print(message)
    orderEntry.placeOrder(TDSession=TDSession)

I have tried doing asyncio.create_task(listener.startStreaming(TDSession,q)), the problem is I get

RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'startStreaming' was never awaited

Which confused me because this seemed to work in Can an asyncio event loop run in the background without suspending the Python interpreter? which is what I'm trying to do.

With the listener.startStreaming function looking like this:

async def startStreaming(TDSession, q):
    streamingClient = TDSession.create_streaming_session()
    streamingClient.account_activity()
    await streamingClient.build_pipeline()
    while True:
        message = await streamingClient.start_pipeline()
        message = parseMessage(message)
        if message != None:
            print('putting message into q')
            print( dict(message) )
            q.put(message)

Is there a way to make this work where I can run the listener in the background?

I've tried this as well, but it only runs the consumer function, instead of running both at the same time

TDSession.grab_refresh_token()
q = queue.Queue(10)
loop = asyncio.get_event_loop()
loop.create_task(listener.startStreaming(TDSession, q))
loop.create_task(consumer(TDSession, q))
loop.run_forever()
mkrieger1
  • 19,194
  • 5
  • 54
  • 65
sf8193
  • 575
  • 1
  • 6
  • 25

1 Answers1

2

As you found out, the asyncio.run function runs the given coroutine until it is complete. In other words, it waits for the coroutine returned by listener.startStreaming to finish before proceeding to the next line.

Using asyncio.create_task, on the other hand, requires the caller to be already running inside an asyncio loop already. From the docs:

The task is executed in the loop returned by get_running_loop(), RuntimeError is raised if there is no running loop in current thread.

What you need is to combine the two, by creating a function that's async, and then call create_task inside that async function.

For example:

async def main():
  TDSession = TDClient()
  TDSession.grab_refresh_token()
  q = asyncio.Queue(10)
  streaming_task = asyncio.create_task(listener.startStreaming(TDSession, q))
  while True:
    message = await q.get()
    print('oh shoot!')
    print(message)
    orderEntry.placeOrder(TDSession=TDSession)

  await streaming_task  # If you want to wait for `startStreaming` to complete after the while loop

if __name__ == '__main__':
  asyncio.run(main())

Edit: From your comment I realized you want to use the producer-consumer pattern, so I also updated the example above to use asyncio.Queue instead of a queue.Queue, in order for the thread to be able to jump between the producer (startStreaming) and the consumer (the while loop)

Maurice Lam
  • 1,577
  • 9
  • 14
  • So is it possible to run these two functions in parallel? I want to treat them like a producer and consumer in a stream, where the startStreaming function places messages in the queue and the code in the main while loop gets it. Is this possible with asyncio? Starting to wonder if it isn't – sf8193 Jan 03 '21 at 08:43
  • Yes you can. The problem here is that `queue.Queue` is a blocking queue (`q.get()` is not awaited, but blocks the thread until there are available values). Instead you should use [`asyncio.Queue`](https://docs.python.org/3/library/asyncio-queue.html), which has a coroutine function `get()` – Maurice Lam Jan 03 '21 at 08:48
  • the async queue was the answer! – sf8193 Jan 03 '21 at 08:58