I have found a lot of question on a similar topic but they didn't help me to solve my problem.
Using :
- Linux Ubuntu 14.04
- python 3.4
- zmq : 4.0.4 // pyZMQ 14.3.1
TL;DR
Receiver queue in ZMQ SUB socket is growing indefinitely even after HWM are set. This happen when subscriber is slower than publisher. What can I do to prevent it ?
Background
I work in the human computer interaction filed. We have a huge code base to control the mouse cursor, this kind of things. I wanted to "break it" in several module, communicating with ZMQ. It must have as little latency as possible, but dropping (losing) messages is not that important.
An other interesting aspect is the possibility to add "spies" between the nodes. Thus the PUB/SUB sockets seems to be the most adequate.
Something like this :
+----------+ +-----------+ +------------+
| | PUB | | PUB | |
| Input | +----+------> | Filter | +----+------> | Output |
| | | SUB | | | SUB | |
+----------+ v +-----------+ v +------------+
+-----+ +-----+
|Spy 1| |Spy 2|
+-----+ +-----+
Problem
Everything works fine, except when we add the spies. If we add a spy doing "heavy stuff" like real time visualisations with matplotlib we notice an increasing latency in the plots. IE : on the graph above, filter and output are fast, no latency is seen, but on Spy 2, latency can reach 10 min after running 20 min (!!)
It looks like the queue on the receiver grows indefinitely. We investigated the High Water Mark (HWM) functionalities of ZMQ to set it low to drop older messages, but nothing changed.
Minimal code
Architecture :
+------------+ +-------------+
| | PUB | |
| sender | -------------> | receiver |
| | SUB| |
+------------+ +-------------+
The receiver is a slow receiver (acting as a spy in the first graph)
Code :
Sender.py
import time
import zmq
ctx = zmq.Context()
sender = ctx.socket(zmq.PUB)
sender.setsockopt(zmq.SNDBUF, 256)
sender.set_hwm(10)
sender.bind('tcp://127.0.0.1:1500')
print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(sender.get_hwm()) ## 10
i = 0
while True:
mess = "{} {}".format(i, time.time())
sender.send_string(mess)
print("Send : {}".format(mess))
i+= 1
receiver.py:
import time
import zmq
ctx = zmq.Context()
front_end = ctx.socket(zmq.SUB)
front_end.set_hwm(1)
front_end.setsockopt(zmq.RCVBUF, 8)
front_end.setsockopt_string(zmq.SUBSCRIBE, '')
front_end.connect('tcp://127.0.0.1:1500')
print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(front_end.get_hwm()) ## 1
while True:
mess = front_end.recv_string()
i, t = mess.split(" ")
mess = "{} {}".format(i, time.time() - float(t))
print("received : {}".format(mess))
time.sleep(1) # slow
I don't think that this is a normal behaviour for ZMQ Pub/Sub. I tried to set the HWM in the receiver, in the subscriber, in both, but nothing changed.
What am I missing ?
Edit :
I don't think I was clear when I explained my problem. I made an implementation moving the mouse cursor. The input was the mouse cursor position send in ZMQ at 200Hz (with a .sleep( 1.0 / 200 )
), some processing was done and the mouse cursor position was updated (I don't have this sleep in my minimal example).
Everything was smooth, even when I launched the spies. The spies nevertheless had a growing latency (because of the slow processing). The latency doesn't appear in the cursor, at the end of the "pipeline".
I think the problem comes from the slow subscriber queuing the messages.
In my example, if we kill the sender and let the receiver alive, messages will continue to be displayed until all (?) the submitted messages are displayed.
The spy is plotting the position of the cursor to provide some feedback, it is still very inconvenient to have such a lag... I just want to get the last message sent, this is why I tried to lower the HWM.