I'm working on a "server" thread, which takes care of some IO calls for a bunch of "clients".
The communication is done using pynng v0.5.0, the server has its own asyncio loop.
Each client "registers" by sending a first request, and then loops receiving the results and sending back READY messages.
On the server, the goal is to treat the first message of each client as a registration request, and to create a dedicated worker task which will loop doing IO stuff, sending the result and waiting for the READY message of that particular client.
To implement this, I'm trying to leverage the Context feature of REP0 sockets.
Side notes
I would have liked to tag this question with nng and pynng, but I don't have enough reputation.Although I'm an avid consumer of this site, it's my first question :)
I do know about the PUB/SUB pattern, let's just say that for self-instructional purposes, I chose not to use it for this service.
Problem:
After a few iterations, some READY messages are intercepted by the registration coroutine of the server, instead of being routed to the proper worker task.
Since I can't share the code, I wrote a reproducer for my issue and included it below.
Worse, as you can see in the output, some result messages are sent to the wrong client (ERROR:root:<Worker 1>: worker/client mismatch, exiting.
).
It looks like a bug, but I'm not entirely sure I understand how to use the contexts correctly, so any help would be appreciated.
Environment:
- winpython-3.8.2
- pynng v0.5.0+dev (46fbbcb2), with nng v1.3.0 (ff99ee51)
Code:
import asyncio
import logging
import pynng
import threading
NNG_DURATION_INFINITE = -1
ENDPOINT = 'inproc://example_endpoint'
class Server(threading.Thread):
def __init__(self):
super(Server, self).__init__()
self._client_tasks = dict()
@staticmethod
async def _worker(ctx, client_id):
while True:
# Remember, the first 'receive' has already been done by self._new_client_handler()
logging.debug(f"<Worker {client_id}>: doing some IO")
await asyncio.sleep(1)
logging.debug(f"<Worker {client_id}>: sending the result")
# I already tried sending synchronously here instead, just in case the issue was related to that
# (but it's not)
await ctx.asend(f"result data for client {client_id}".encode())
logging.debug(f"<Worker {client_id}>: waiting for client READY msg")
data = await ctx.arecv()
logging.debug(f"<Worker {client_id}>: received '{data}'")
if data != bytes([client_id]):
logging.error(f"<Worker {client_id}>: worker/client mismatch, exiting.")
return
async def _new_client_handler(self):
with pynng.Rep0(listen=ENDPOINT) as socket:
max_workers = 3 + 1 # Try setting it to 3 instead, to stop creating new contexts => now it works fine
while await asyncio.sleep(0, result=True) and len(self._client_tasks) < max_workers:
# The issue is here: at some point, the existing client READY messages get
# intercepted here, instead of being routed to the proper worker context.
# The intent here was to open a new context only for each *new* client, I was
# assuming that a 'recv' on older worker contexts would take precedence.
ctx = socket.new_context()
data = await ctx.arecv()
client_id = data[0]
if client_id in self._client_tasks:
logging.error(f"<Server>: We already have a task for client {client_id}")
continue # just let the client block on its 'recv' for now
logging.debug(f"<Server>: New client : {client_id}")
self._client_tasks[client_id] = asyncio.create_task(self._worker(ctx, client_id))
await asyncio.gather(*list(self._client_tasks.values()))
def run(self) -> None:
# The "server" thread has its own asyncio loop
asyncio.run(self._new_client_handler(), debug=True)
class Client(threading.Thread):
def __init__(self, client_id: int):
super(Client, self).__init__()
self._id = client_id
def __repr__(self):
return f'<Client {self._id}>'
def run(self):
with pynng.Req0(dial=ENDPOINT, resend_time=NNG_DURATION_INFINITE) as socket:
while True:
logging.debug(f"{self}: READY")
socket.send(bytes([self._id]))
data_str = socket.recv().decode()
logging.debug(f"{self}: received '{data_str}'")
if data_str != f"result data for client {self._id}":
logging.error(f"{self}: client/worker mismatch, exiting.")
return
def main():
logging.basicConfig(level=logging.DEBUG)
threads = [Server(),
*[Client(i) for i in range(3)]]
for t in threads:
t.start()
for t in threads:
t.join()
if __name__ == '__main__':
main()
Output:
DEBUG:asyncio:Using proactor: IocpProactor
DEBUG:root:<Client 1>: READY
DEBUG:root:<Client 0>: READY
DEBUG:root:<Client 2>: READY
DEBUG:root:<Server>: New client : 1
DEBUG:root:<Worker 1>: doing some IO
DEBUG:root:<Server>: New client : 0
DEBUG:root:<Worker 0>: doing some IO
DEBUG:root:<Server>: New client : 2
DEBUG:root:<Worker 2>: doing some IO
DEBUG:root:<Worker 1>: sending the result
DEBUG:root:<Client 1>: received 'result data for client 1'
DEBUG:root:<Client 1>: READY
ERROR:root:<Server>: We already have a task for client 1
DEBUG:root:<Worker 1>: waiting for client READY msg
DEBUG:root:<Worker 0>: sending the result
DEBUG:root:<Client 0>: received 'result data for client 0'
DEBUG:root:<Client 0>: READY
DEBUG:root:<Worker 0>: waiting for client READY msg
DEBUG:root:<Worker 1>: received 'b'\x00''
ERROR:root:<Worker 1>: worker/client mismatch, exiting.
DEBUG:root:<Worker 2>: sending the result
DEBUG:root:<Client 2>: received 'result data for client 2'
DEBUG:root:<Client 2>: READY
DEBUG:root:<Worker 2>: waiting for client READY msg
ERROR:root:<Server>: We already have a task for client 2
Edit (2020-04-10): updated both pynng and the underlying nng.lib to their latest version (master branches), still the same issue.