0

Given a multiprocessing.Queue that is filled from different Python threads, created via ThreadPoolExecutor.submit(...).

How to access that Queue with asyncio / Trio / Anyio in a safe manner (context FastAPI) and reliable manner?

I am aware of Janus library, but prefer a custom solution here.

Asked (hopefully) more concisely:

How to implement the

await <something_is_in_my_multiprocessing_queue>

to have it accesible with async/await and to prevent blocking the event loop?

What synchronization mechanism in general would you suggest?

(Attention here: multiprocessing.Queue not asyncio.Queue)

Martin Senne
  • 5,939
  • 6
  • 30
  • 47
  • If you want to bridge async and threads, why use a *multiprocessing* queue? The async queues already support threaded use. – MisterMiyagi Jan 28 '23 at 20:36
  • I have to say, I also don't understand why you are interested in using a _multiprocessing_ queue, when you explicitly said you are dealing with multiple _threads_. I assume you know the [`queue`](https://docs.python.org/3/library/queue.html#module-queue) module exists for that exact purpose. As for the `asyncio` part, I can't imagine how that is relevant in this context since the event loop is confined to a single thread. – Daniil Fajnberg Jan 29 '23 at 01:03
  • @MisterMiyagi: Your statement "The async queues already support threaded use." is not correct, see https://docs.python.org/3/library/asyncio-queue.html which states directly on the top: `... asyncio queues are not thread-safe, ... ` It is also not helpful to duplicate the link, dano already provided. – Martin Senne Feb 01 '23 at 09:25
  • @DaniilFajnberg: As a have multiple workers, constantly producing output that is collected in an `mp.Queue` (for thread-safety). Later on these results are used in FastAPI (this is where `asyncio` or more precise `anyio` enter the game). – Martin Senne Feb 01 '23 at 09:26
  • @MartinSenne "Does this answer your question?" is part of duplicate closure proposal, which allows others to directly accept/vote for the duplicate candidate. – MisterMiyagi Feb 01 '23 at 10:35
  • @MartinSenne I did not say asyncio queues are thread-safe. I did say they "support threaded use"; threaded use in the asyncio world is via asnycio's thread helpers. Anyway, the point is still that the question insists on using `multiprocessing.Queue` which is the one standard library queue furthest away from both threading and asyncio; why? Some cheap synchronisation means between asyncio and threads won't work with multiprocessing, so it is critical to know whether multiprocessing support is actually required or not. – MisterMiyagi Feb 01 '23 at 10:39

1 Answers1

0

Actually, I figured it out.

Given a method, that reads the mp.Queue:

def read_queue_blocking():
    return queue.get()

Comment: And this is the main issue: A call to get is blocking.

We can now either

For FastAPI

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await websocket.accept()
    while True:
        import anyio
        queue_result = await anyio.to_thread.run_sync(read_queue_blocking)
        await websocket.send_text(f"Message text was: {queue_result}")
Martin Senne
  • 5,939
  • 6
  • 30
  • 47
  • It is not correct to use `to_thread` to handle `mp.Queue`, because the first is from `threading` world, another is from `multiprocesseing` world. If you want to to go the way, you need to change `to_thread` to `run_in_executor` with `ProcessPoolExecutor` as a pool. Another questions, do you consider only one user to use websocket in a time? If not, each client should have a dedicated Queue, otherwise each user get only part of messages. – Artiom Kozyrev Feb 01 '23 at 15:45
  • 1
    @Artiom: Thanks for your critical and very valuable comments. Let us elaborate! Not (fully) correct: `to_thread` here is from anyio (and underlying asyncio world) executes on worker thread spawned by asyncio. Why do you regard that harmful? This is the way to "wrap" blocking code. See [aioprocessing](https://github.com/dano/aioprocessing) for details, they do it the same way. For anyio I could also use `to_process` instead, but why, a thread is good (enough). We need a thread that behaves well (with respect to await) and queries the `mp.Queue`. What am I missing from your of view? – Martin Senne Feb 01 '23 at 19:26
  • 1
    @Artiom: You are totally right. Each client will have its dedicated queue .... but not of importance here. – Martin Senne Feb 01 '23 at 19:32
  • I have checked `aioprocessing` library, they really use `ThreadPoolExecutor` to handle `multiprocessing` blocking operations, on the other hand in "How does it work?" they write about possible drawbacks and errors, also they say that part of library is made on future callbacks. So you are absolutely right that they use `threads` in some cases to wait for mp results also knowing about possible risks. I still think that you can overcome necessity to use `mp.Queue` with callbacks, I changed my answer, check if it works for you. I used callbacks to feed `asyncio.Queue` for "websockets". – Artiom Kozyrev Feb 01 '23 at 21:06
  • @ArtiomKozyrev: Thx for your effort. Still, your solution is fine in general, but does not solve my issue (which is that the long running threads / processes are _constantly_ spitting out data and populate the `mp.Queue`. In Summary: Either go the route of _Janus_ library with two links (one `threading.lock` intertwined with `asyncio.lock`) or go the `run_sync` way. – Martin Senne Feb 04 '23 at 10:55