5

In python, what's the idiomatic way to establish a one-way communication between two threading.Threads, call them thread a and thread b.

a is the producer, it continuously generates values for b to consume.

b is the consumer, it reads one value generated by a, process the value with a coroutine, and then reads the next value, and so on.

Illustration:

q = very_magic_queue.Queue()


def worker_of_a(q):
    while True:
        q.put(1)
        time.sleep(1)

a = threading.Thread(worker_of_a, args=(q,))
a.start()


async def loop(q):
    while True:
        # v must be processed in the same order as they are produced
        v = await q.get()
        print(v)

async def foo():
    pass

async def b_main(q):
    loop_fut = asyncio.ensure_future(loop(q))
    foo_fut = asyncio.ensure_future(foo())
    _ = await asyncio.wait([loop_fut, foo_fut], ...)
    # blah blah blah

def worker_of_b(q):
    asyncio.set_event_loop(asyncio.new_event_loop())
    asyncio.get_event_loop().run_until_complete(b_main(q))

b = threading.Thread(worker_of_b, args=(q,))
b.start()

Of course the above code doesn't work, because queue.Queue.get cannot be awaitted, and asyncio.Queue cannot be used in another thread.

I also need a communication channel from b to a.

I would be great if the solution could also work with gevent.

Thanks :)

Incömplete
  • 853
  • 8
  • 20
  • A couple of related questions I asked some time ago: [How can I synchronize asyncio with other OS threads?](https://stackoverflow.com/q/53158101), [How can you wait for completion of a callback submitted from another thread?](https://stackoverflow.com/q/53107032). – jdehesa Apr 24 '19 at 12:19

4 Answers4

4

You can use a synchronized queue from the queue module and defer the wait to a ThreadPoolExecutor:

async def loop(q):
    from concurrent.futures import ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=1) as executor:
        loop = asyncio.get_event_loop()
        while True:
            # v must be processed in the same order as they are produced
            v = await loop.run_in_executor(executor, q.get)
            print(v)
jdehesa
  • 58,456
  • 7
  • 77
  • 121
  • This looks like an efficient workaround. It doesn't use async sleeps. Not sure what's the perfomance impact of using the q.get in the executor, but as long as the executor has pool, it shouldn't be a problem, right? – AFP_555 Apr 11 '23 at 06:26
  • @AFP_555 There is a certain overhead of switching threads, but the code submits one task to the executor at a time, so it should never get blocked waiting for previous tasks to finish, and other asyncio tasks can always continue running while waiting on the queue (that is, the asyncio loop thread is not blocked). – jdehesa Apr 11 '23 at 14:24
2

I've used Janus to solve this problem - it's a Python library that gives you a thread-safe queue that can be used to communicate between asyncio and a thread.

def threaded(sync_q):
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_code(async_q):
    for i in range(100):
        val = await async_q.get()
        assert val == i
        async_q.task_done()


queue = janus.Queue()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
await async_code(queue.async_q)
Simon Willison
  • 15,642
  • 5
  • 36
  • 44
1

I had a similar problem -communicate data between a thread and asyncio. The solution I used is to create a sync Queue and add methods for async get and async put using asyncio.sleep to make it non-blocking. Here is my queue class:


#class to provide queue (sync or asyc morph)
class queMorph(queue.Queue):
    def __init__(self,qSize,qNM):
        super().__init__(qSize)
        self.timeout=0.018
        self.me=f'queMorph-{qNM}'
    #Introduce methods for async awaitables morph of Q
    async def aget(self):
        while True:
            try:
                return self.get_nowait()
            except queue.Empty:
                await asyncio.sleep(self.timeout)
            except Exception as E:
                raise
    async def aput(self,data):
        while True:
            try:
                return self.put_nowait(data)
            except queue.Full:
                print(f'{self.me} Queue full on put..')
                await asyncio.sleep(self.timeout)
            except Exception as E:
                raise

To put/get items from queue from the thread (synchronous), use the normal q.get() and q.put() blocking functions. In the async loop, use q.aget() and q.aput() which do not block.

giwyni
  • 2,928
  • 1
  • 19
  • 11
0

You can use a regular asyncio.Queue and just have the other thread call it indirectly using loop.call_soon_threadsafe(). Here's a diff of the main lines I changed from your example:

--- asyncio-thread-queue-ORIG-cleaned-up.py     2023-03-16 17:13:29.073600000 -0400
+++ asyncio-thread-queue.py     2023-03-16 17:11:14.736165700 -0400
@@ -1,9 +1,10 @@
-q = very_magic_queue.Queue()
+q = asyncio.Queue()
+asyncio_loop = asyncio.new_event_loop()


 def worker_of_a(q):
     while True:
-        q.put(1)
+        asyncio_loop.call_soon_threadsafe(q.put_nowait, 1)
         time.sleep(1)

 a = threading.Thread(target=worker_of_a, args=(q,))
@@ -26,8 +27,8 @@
     # blah blah blah

 def worker_of_b(q):
-    asyncio.set_event_loop(asyncio.new_event_loop())
-    asyncio.get_event_loop().run_until_complete(b_main(q))
+    asyncio.set_event_loop(asyncio_loop)
+    asyncio_loop.run_until_complete(b_main(q))

 b = threading.Thread(target=worker_of_b, args=(q,))
 b.start()
paulie4
  • 457
  • 3
  • 10