My use case:
- The Subscriber will be a server(bind to a port) and it will wait messages from multiple publishers.
- The Publishers will be initialized inside different threads as clients(connect to the port).
- The data to publish in each thread will be a couple of messages.
- It is important when subscriber is connected, to get every message and as soon as possible.
- If subscriber is not connected then I don't want to keep the publisher thread blocked, ideally having a timeout around 1-2s it works.
The slow joiner problem:
Running more than 1000 threads(publishers) only 1 or 2 times I get all the data in Subscriber. Adding a sleep for some milliseconds solves the issue, so I'm 99.9% sure that I'm victim of the well-known slow joiner syndrome. However sleep solution in my case is not a good solution as connect time for publisher can be variable and I want the data to subscriber as soon as possible.
My thoughts and experiment code on solving this issue:
My solution is based on using XPUB recv method. Initialize publisher with XPUB and set RCVTIMEO to 1000ms. After publisher connection, I add a recv()
call for checking if there is a subscriber. When I get the subscribe message, I know that connection has been finalized and that I can send data without any of them being lost (except if something wrong happen to subscriber but I don't care).
In case that I don't get any subscribe message then in 1000ms recv()
times out and the thread is terminated.
Here is a sample code in python(pyzmq) to test this implementation (for publisher I don't use threads but a while loop and running multiple publishers at the same time) and it works as I wanted to:
publisher.py:
import zmq
def main():
""" main method """
i = 0
while True:
# Prepare context and publisher
context = zmq.Context()
publisher = context.socket(zmq.XPUB)
publisher.connect("tcp://0.0.0.0:5650")
publisher.setsockopt(zmq.RCVTIMEO, 1000)
# Waiting for 1000ms to get a subscription
i = i + 1
try:
publisher.recv()
# Send the message
publisher.send_multipart([b'test', bytes(str(i),'utf-8')])
except Exception as e:
print(e, flush=True)
# Terminate socket and context
publisher.close()
context.term()
if i >= 10000:
break
if __name__ == "__main__":
main()
subscriber.py:
import zmq
def main():
""" main method """
# Prepare our context and subscriber
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
uri = "tcp://0.0.0.0:5650"
subscriber.bind(uri)
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
print('Subscriber connects to %s' % (uri), flush=True)
# Receive messages
i = 0
while True:
[topic, data] = subscriber.recv_multipart()
i = i + 1
print("%s: %s %s" % (i, topic, data), flush=True)
if __name__ == "__main__":
main()
My question:
Is the solution that simple? Am I missing anything(related to slow joiner) that will cause loss of data if there is a subscriber active?