each subscriber has its own queue
Yes, it does ...
this comes from the designed properties of the PUB
-side .Context()
-instance, where a sending queue-management takes place ( more on this will come a bit later ).
One may enjoy a short read about the main conceptual tricks in [ ZeroMQ hierarchy in less than a five seconds ] Section.
This would seem to indicate that each subscriber receives messages from the publisher independent of other subscribers.
Yes, it does ...
there is no interaction among respective "private"-queues. What matters here is the ZMQ_HWM
, in its side-effect role of The "Blocker"-semantics.
In this setup, the minimalistic ZMQ_HWM
guards / blocks any new entry from being inserted into the PUB
-side "private"-sending-Queue ( sized no deeper than according to the ZMQ_HWM == 1
), until it was successfully remote-emptied ( by the "remote" SUB
-side Context()
-s autonomously asynchronous "internal" transport-related initiative, upon its possible (re-)load of that SUB
-side "private"-receiving-Queue ( sized, again, no deeper than according to the ZMQ_HWM == 1
)
In other words, the PUB.send()
-s' payloads will have efficiently been discarded, until a remote *_SUB.recv()
-s will unload the "blocking"-payload from their "remote"-Context()
-instance's receving-Queue ( sized, as designed not to be able to store any single payload more, than one - according to the ZMQ_HWM == 1
).
In this very manner, the PUB.send()
-er fired more than ~ 902601
messages, during the ( secretly blocking ) test of receiving just about some 20
of them on the SUB
-side ( == to_read
).
All those 902581+
messages were simply thrown away right at the PUB
-side by the Context()
upon the call to .send()
-method.
How does it actually work inside ? a simplified view inside the Context()
Given the mock-up example above, the Context()
-managed pool-of-queues grows / contracts as per .connect()
-ed peers appear and disappear, yet in ZeroMQ API v2.2 having both the TX- and the RX-side the same High Water Mark ceiling. As documented, attempts to .send()
anything above this limit gets thrown away.
TIME _____________________________
v [ ]
v [ ]
v [ ]
v [ ]
v PUB.setsockopt( ZMQ_HWM, 1 );]
v PUB.send()-s [ | ]
v : [ +-----------------QUEUE-length ( a storage depth ) is but one single message
v _________________ : [
v [ ] : [Context()-managed pool-of-QUEUE(s)
v [ ] : [
v [ ] : [ ___________________
v [ ] : [ [ ]
v FAST_SUB.connect()---:------------>[?] [ ]
v FAST_SUB.recv()-s : [?] [ ]
v : : [?] [ ]
v : : [?][?]<---SLOW_SUB.connect() ]
v : : [?][?] SLOW_SUB.recv()-s ]
v : .send(1)----->[1][1] :
| 1 <-.recv()--------------------[?][1] :
| : [?][1] :
| : .send(2)----->[2][1] :
| 2 <-.recv()--------------------[?][1] :
| : [?][1] :
| : .send(3)----->[3][1] :
| 3 <-.recv()--------------------[?][?]------------.recv()-> 1
| : [?][?] :
| : .send(4)----->[4][4] :
| 4 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(5)----->[5][4] :
| 5 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(6)----->[6][4] :
| 6 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(7)----->[7][4] :
| 7 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(8)----->[8][4] :
| 8 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(9)----->[9][4] :
| 9 <-.recv()--------------------[?][?]------------.recv()-> 4
| : [?][?] :
| : .send(A)----->[A][A] :
| A <-.recv()--------------------[?][A]
| : [?][A]
| : .send(B)----->[B][A]
| B <-.recv()--------------------[?][A]
v : [ [
v : [
v :
v
"Messages on the fast subscriber starting here line up with messages on the slow subscriber"
No, this does not happen. There is no "line-up", but a just co-incidence of durations, where fast-SUB
has not yet made it's 20x .recv()
-s, before slow(-ed)-SUB
finally got after it's blocking sleep( 3 )
.
The initial "gap" is just the impact of the sleep( 3 )
phase, where the slower-SUB
does not attempt to receive anything
main(){
|
| async(launch::async,fast|_fast____________|
| async(launch::async,slow| .setsockopt |_slow____________|
| ... | .setsockopt | .setsockopt |
| ... | .connect | .setsockopt |
| thread | ~~~~~~? | .connect |
| |_pub___________________| ~~~~~~? | ~~~~~~? |
| | .setsockopt | ~~~~~~? | ~~~~~~? |
| | .bind | ~~~~~~? | ~~~~~~? |
| | ~~~~~~? | ~~~~~~? | ~~~~~~? |
| | ~~~~~~=RTO | ~~~~~~? | ~~~~~~? |
| | .send()-s 1,2,..99| ~~~~~~? | ~~~~~~? |
| | .send()-s 23456,..| ~~~~~~=RTO | ~~~~~~=RTO |
| | .send()-s 25988,..| 25988 --> v[ 0]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 52522,..| 52522 --> v[ 1]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 79197,..| 79197 --> v[ 2]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 106365,..| 106365 --> v[ 3]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 132793,..| 132793 --> v[ 4]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 159236,..| 159236 --> v[ 5]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 184486,..| 184486 --> v[ 6]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 209208,..| 209208 --> v[ 7]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 234483,..| 234483 --> v[ 8]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 256122,..| 256122 --> v[ 9]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 281188,..| 281188 --> v[10]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 305855,..| 305855 --> v[11]| 305855 --> v[ 0]|// Messages on the fast subscriber starting here line up with messages on the slow subscriber
| | .send()-s 454312,..| 454312 --> v[12]| 454312 --> v[ 1]|
| | .send()-s 477807,..| 477807 --> v[13]| 477807 --> v[ 2]|
| | .send()-s 502594,..| 502594 --> v[14]| 502594 --> v[ 3]|
| | .send()-s 528551,..| 528551 --> v[15]| 528551 --> v[ 4]|
| | .send()-s 554519,..| 554519 --> v[16]| 554519 --> v[ 5]|
| | .send()-s 581419,..| 581419 --> v[17]| 581419 --> v[ 6]|
| | .send()-s 606411,..| 606411 --> v[18]| 606411 --> v[ 7]|
| | .send()-s 629298,..| 629298 --> v[19]| 629298 --> v[ 8]|
| | .send()-s 651159,..| | 651159 --> v[ 9]|
| | .send()-s 675031,..| return v | 675031 --> v[10]|
| | .send()-s 701533,..|_________________| 701533 --> v[11]|
| | .send()-s 727817,..| | 727817 --> v[12]|
| | .send()-s 754154,..| | 754154 --> v[13]|
| | .send()-s 778654,..| | 778654 --> v[14]|
| | .send()-s 804137,..| | 804137 --> v[15]|
| | .send()-s 830677,..| | 830677 --> v[16]|
| | .send()-s 854959,..| | 854959 --> v[17]|
| | .send()-s 878841,..| | 878841 --> v[18]|
| | .send()-s 902601,..| | 902601 --> v[19]|
| | .send()-s 912345,..| | |
| | .send()-s 923456,..| | return v |
| | .send()-s 934567,..| |_________________|
| | .send()-s 945678,..|
| | .send()-s 956789,..|
| | .send()-s 967890,..|
| | .send()-s 978901,..|
| | .send()-s 989012,..|
| | .send()-s 990123,..|
| | .send()-s ad inf,..|
While PUB
-side code imperatively calls .send()
-s as fast as it can, it's local Context()
-instance did not reserve more space than for just a one such message to accept, all the others got silently dropped, whenever an en-queued solo-position was occupied.
Whenever the HWM == 1
marker got back down to zero, the internal mechanics did allow a next other .send()
to pass the actual content of the message ( the payload ) downto the queue storage and all the forthcoming attempts from following .send()
-s again started to become silently dropped due to the HWM
-bound logic.