1

I am using zmq 2.2 (older version, I know) in c++ to create a publisher with several connected subscribers that read messages at different speeds. From my understanding of the docs, as well as Peter Hintjens' answer here, each subscriber has its own queue, and the publisher has a queue per connected subscriber. This would seem to indicate that each subscriber receives messages from the publisher independent of other subscribers.

However, in the code snippet below the fast and slow subscribers receive similar messages or the exact same messages (this happens even when I increase the sleep time at Point A and change the ZMQ_HWM at point B).

Can anyone shed some light on why this is happening?

#include <zmq.hpp>
#include <unistd.h>
#include <iostream>
#include <vector>
#include <future>
using socket_t = zmq::socket_t;
using context_t = zmq::context_t;
using msg_t = zmq::message_t;
using namespace std;

vector<int> slow_consumer(int64_t hwm, int to_read)
{
    vector<int> v;
    context_t context{1};
    socket_t socket(context, ZMQ_SUB);
    socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    socket.connect("tcp://localhost:5554");
    msg_t msg;
    sleep(3);  // 3 seconds
    for (int i = 0; i < to_read; i++)
    {
        socket.recv(&msg);
        usleep(10000);  // 10 miliseconds ___________________________POINT A
        v.emplace_back(*reinterpret_cast<int*>(msg.data()));
    }
    return v;
}
vector<int> fast_consumer(int64_t hwm, int to_read)
{
    vector<int> v;
    context_t context{1};
    socket_t socket(context, ZMQ_SUB);
    socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    socket.connect("tcp://localhost:5554");
    msg_t msg;
    for (int i = 0; i < to_read; i++)
    {
        socket.recv(&msg);
        v.emplace_back(*reinterpret_cast<int*>(msg.data()));
    }
    return v;
}
void publisher(int64_t hwm)
{
    context_t context{1};
    socket_t socket(context, ZMQ_PUB);
    socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
    socket.bind("tcp://*:5554");
    int count = 0;
    while (true) {
        msg_t msg(sizeof(count));
        memcpy(msg.data(), &count, sizeof(count));
        socket.send(msg);
        count++;
    }
}

int main() 
{
    int64_t hwm = 1;  // __________________________________________POINT B
    int to_read = 20;
    auto fast = async(launch::async, fast_consumer, hwm, to_read);
    auto slow = async(launch::async, slow_consumer, hwm, to_read);
    hwm = 1;  // Don't queue anything on the publisher
    thread pub(publisher, hwm);
    auto slow_v = slow.get();
    auto fast_v = fast.get();

    cout << "fast    slow" << endl;
    for (int i = 0; i < fast_v.size(); i ++)
    {
        cout << fast_v[i] << "   " << slow_v[i] << endl;
    }
    exit(0);
}

Compiled with: g++ -o mixed mixed_speed_consumers.cpp -g -lzmq -lpthread by GCC 6.3

Sample Output:

fast    slow
 25988   305855
 52522   454312
 79197   477807
106365   502594
132793   528551
159236   554519
184486   581419
209208   606411
234483   629298
256122   651159
281188   675031
305855   701533  // Messages on the fast subscriber starting here line up with messages on the slow subscriber
454312   727817
477807   754154
502594   778654
528551   804137
554519   830677
581419   854959
606411   878841
629298   902601
user3666197
  • 1
  • 6
  • 50
  • 92
TheGaldozer
  • 95
  • 1
  • 9

1 Answers1

2

each subscriber has its own queue

Yes, it does ...

this comes from the designed properties of the PUB-side .Context()-instance, where a sending queue-management takes place ( more on this will come a bit later ).

One may enjoy a short read about the main conceptual tricks in [ ZeroMQ hierarchy in less than a five seconds ] Section.

This would seem to indicate that each subscriber receives messages from the publisher independent of other subscribers.

Yes, it does ...

there is no interaction among respective "private"-queues. What matters here is the ZMQ_HWM, in its side-effect role of The "Blocker"-semantics.

In this setup, the minimalistic ZMQ_HWM guards / blocks any new entry from being inserted into the PUB-side "private"-sending-Queue ( sized no deeper than according to the ZMQ_HWM == 1 ), until it was successfully remote-emptied ( by the "remote" SUB-side Context()-s autonomously asynchronous "internal" transport-related initiative, upon its possible (re-)load of that SUB-side "private"-receiving-Queue ( sized, again, no deeper than according to the ZMQ_HWM == 1 )

In other words, the PUB.send()-s' payloads will have efficiently been discarded, until a remote *_SUB.recv()-s will unload the "blocking"-payload from their "remote"-Context()-instance's receving-Queue ( sized, as designed not to be able to store any single payload more, than one - according to the ZMQ_HWM == 1 ).

In this very manner, the PUB.send()-er fired more than ~ 902601 messages, during the ( secretly blocking ) test of receiving just about some 20 of them on the SUB-side ( == to_read ).

All those 902581+ messages were simply thrown away right at the PUB-side by the Context() upon the call to .send()-method.


How does it actually work inside ? a simplified view inside the Context()

Given the mock-up example above, the Context()-managed pool-of-queues grows / contracts as per .connect()-ed peers appear and disappear, yet in ZeroMQ API v2.2 having both the TX- and the RX-side the same High Water Mark ceiling. As documented, attempts to .send() anything above this limit gets thrown away.

TIME                   _____________________________
v                     [                             ]
v                     [                             ]
v                     [                             ]
v                     [                             ]
v                     PUB.setsockopt(  ZMQ_HWM, 1 );]
v                     PUB.send()-s     [        |   ]
v                        :             [        +-----------------QUEUE-length ( a storage depth ) is but one single message
v    _________________   :             [             
v   [                 ]  :             [Context()-managed pool-of-QUEUE(s)
v   [                 ]  :             [
v   [                 ]  :             [          ___________________
v   [                 ]  :             [         [                   ]
v   FAST_SUB.connect()---:------------>[?]       [                   ]
v   FAST_SUB.recv()-s    :             [?]       [                   ]
v           :            :             [?]       [                   ]
v           :            :             [?][?]<---SLOW_SUB.connect()  ]
v           :            :             [?][?]    SLOW_SUB.recv()-s   ]
v           :            .send(1)----->[1][1]            :
|       1 <-.recv()--------------------[?][1]            :
|           :                          [?][1]            :
|           :            .send(2)----->[2][1]            :
|       2 <-.recv()--------------------[?][1]            :
|           :                          [?][1]            :
|           :            .send(3)----->[3][1]            :
|       3 <-.recv()--------------------[?][?]------------.recv()-> 1
|           :                          [?][?]            :
|           :            .send(4)----->[4][4]            :
|       4 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(5)----->[5][4]            :
|       5 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(6)----->[6][4]            :
|       6 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(7)----->[7][4]            :
|       7 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(8)----->[8][4]            :
|       8 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(9)----->[9][4]            :
|       9 <-.recv()--------------------[?][?]------------.recv()-> 4
|           :                          [?][?]            :
|           :            .send(A)----->[A][A]            :
|       A <-.recv()--------------------[?][A]
|           :                          [?][A]
|           :            .send(B)----->[B][A]
|       B <-.recv()--------------------[?][A]
v           :                          [  [
v           :                          [
v           :
v

"Messages on the fast subscriber starting here line up with messages on the slow subscriber"

No, this does not happen. There is no "line-up", but a just co-incidence of durations, where fast-SUB has not yet made it's 20x .recv()-s, before slow(-ed)-SUB finally got after it's blocking sleep( 3 ).

The initial "gap" is just the impact of the sleep( 3 ) phase, where the slower-SUB does not attempt to receive anything

main(){
|  
| async(launch::async,fast|_fast____________|
| async(launch::async,slow|     .setsockopt |_slow____________|
| ...                     |     .setsockopt |     .setsockopt |
| ...                     |     .connect    |     .setsockopt |
| thread                  |      ~~~~~~?    |     .connect    |
| |_pub___________________|      ~~~~~~?    |      ~~~~~~?    |
| |    .setsockopt        |      ~~~~~~?    |      ~~~~~~?    |
| |    .bind              |      ~~~~~~?    |      ~~~~~~?    |
| |     ~~~~~~?           |      ~~~~~~?    |      ~~~~~~?    |
| |     ~~~~~~=RTO        |      ~~~~~~?    |      ~~~~~~?    |
| |    .send()-s  1,2,..99|      ~~~~~~?    |      ~~~~~~?    |
| |    .send()-s  23456,..|      ~~~~~~=RTO |      ~~~~~~=RTO |
| |    .send()-s  25988,..|  25988 --> v[ 0]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s  52522,..|  52522 --> v[ 1]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s  79197,..|  79197 --> v[ 2]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 106365,..| 106365 --> v[ 3]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 132793,..| 132793 --> v[ 4]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 159236,..| 159236 --> v[ 5]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 184486,..| 184486 --> v[ 6]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 209208,..| 209208 --> v[ 7]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 234483,..| 234483 --> v[ 8]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 256122,..| 256122 --> v[ 9]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 281188,..| 281188 --> v[10]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 305855,..| 305855 --> v[11]| 305855 --> v[ 0]|// Messages on the fast subscriber starting here line up with messages on the slow subscriber
| |    .send()-s 454312,..| 454312 --> v[12]| 454312 --> v[ 1]|
| |    .send()-s 477807,..| 477807 --> v[13]| 477807 --> v[ 2]|
| |    .send()-s 502594,..| 502594 --> v[14]| 502594 --> v[ 3]|
| |    .send()-s 528551,..| 528551 --> v[15]| 528551 --> v[ 4]|
| |    .send()-s 554519,..| 554519 --> v[16]| 554519 --> v[ 5]|
| |    .send()-s 581419,..| 581419 --> v[17]| 581419 --> v[ 6]|
| |    .send()-s 606411,..| 606411 --> v[18]| 606411 --> v[ 7]|
| |    .send()-s 629298,..| 629298 --> v[19]| 629298 --> v[ 8]|
| |    .send()-s 651159,..|                 | 651159 --> v[ 9]|
| |    .send()-s 675031,..|     return v    | 675031 --> v[10]|
| |    .send()-s 701533,..|_________________| 701533 --> v[11]|
| |    .send()-s 727817,..|                 | 727817 --> v[12]|
| |    .send()-s 754154,..|                 | 754154 --> v[13]|
| |    .send()-s 778654,..|                 | 778654 --> v[14]|
| |    .send()-s 804137,..|                 | 804137 --> v[15]|
| |    .send()-s 830677,..|                 | 830677 --> v[16]|
| |    .send()-s 854959,..|                 | 854959 --> v[17]|
| |    .send()-s 878841,..|                 | 878841 --> v[18]|
| |    .send()-s 902601,..|                 | 902601 --> v[19]|
| |    .send()-s 912345,..|                 |                 |
| |    .send()-s 923456,..|                 |     return v    |
| |    .send()-s 934567,..|                 |_________________|
| |    .send()-s 945678,..|
| |    .send()-s 956789,..|
| |    .send()-s 967890,..|
| |    .send()-s 978901,..|
| |    .send()-s 989012,..|
| |    .send()-s 990123,..|
| |    .send()-s ad inf,..|                    

While PUB-side code imperatively calls .send()-s as fast as it can, it's local Context()-instance did not reserve more space than for just a one such message to accept, all the others got silently dropped, whenever an en-queued solo-position was occupied.

Whenever the HWM == 1 marker got back down to zero, the internal mechanics did allow a next other .send() to pass the actual content of the message ( the payload ) downto the queue storage and all the forthcoming attempts from following .send()-s again started to become silently dropped due to the HWM-bound logic.

user3666197
  • 1
  • 6
  • 50
  • 92
  • Thank you for the thorough explanation! I'm still not understanding why the fast and slow subscribers would see the same messages when they have independent queues. The diagram matches the expected behavior, but not the actual behavior – TheGaldozer May 17 '18 at 17:18
  • **The picture would be pretty tall, if you insist on posting the scheme in real scale ... till 902601 messages .sent()** Behaviour of un-coordinated ( no setup / .connect() barrier before sending ), non-queued (HWM = 1), async triade-of-process flows simply cannot finish in anything else than you reported. Given you add a call to sleep( 5 ) into each of the launched receivers ( after they have asked to .connect() ), they will receive the same ID number at the beginning of the v[]. Similarly, if you would permit HWM above ~ 20 ~ 30 ~ 40, you will receive the same ID-numbers in both v[]-vectors. – user3666197 May 17 '18 at 19:29