0

I'm working on a program in C++ that needs to be able to send / receive JSON-payloads from an arbitrary number of other clients.

At first, I tried to implement PubNub service, but I figured I can't get and publish messages at the same time (even using two different contexts on distinct threads). I need to be able to do that. I also found that PubNub has too much latency for my liking.

I came across the ZeroMQ library which has a PUB/SUB model that would suit my needs. But all examples I came across explain how to implement it in a way that one process is a Publisher OR a Subscriber, and not both at the same time.

Ideally, I would like to program a server that would relay all messages coming from anyone, to anyone subscribed to a specific channel specified in the message. Anyone should be able to receive and publish messages to anyone else on the network, provided they are subscribed to the correct channel.


UPDATE 1:

Note : I do not need insurance of receipt because payload N+1 will take precedence over payload N. I want a send and forget mean of communication (UDP-like).

As requested : The PubNub limit of 32 kB per JSON-payload was perfect for me, I do not need more. In fact, my payloads are around 4 kB in average. All instances of clients will run on the same local network, so latency should be less than 5 ms ideally. As for the number of clients, there won't be more than 4 clients subscribed to the same channel/topic at a time.


UPDATE 2 :

I cannot predict how many channels/topics will exist ahead of time, but it will be in the order of dozens (most of the time), hundreds (at peak). Not thousands.


Questions:

Q1: - Can I implement such a behavior using ZeroMQ ?
Q2: - Is there any working sample demonstrating that (preferably in C++) ?
Q3: - If not, any suggestions for a library in C++ ?


pubsub architecture

user3666197
  • 1
  • 6
  • 50
  • 92
  • Would you kindly state a **quantitative threshold for a node-to-node process latency in `[ns]`**, after which some solution "has too much latency for your liking" **+** a working **estimate of upper bounds for an arbitrary number of other clients in `[thousands]`** + a set of working estimates on **{ min | Avg | MAX }-sizes of `JSON`-payloads in `[kB]`** -- as noted above in your text? Thank you. – user3666197 May 17 '16 at 03:42
  • Thanks @HippolyteBarraud for your kind updates. What may be the resulting amount of clients inside the same collision domain, if taking the said max 4 per channel / topic multiplied by a theoretical ( unspecified yet ) maximum count of possible topics? Thanks for quantitative details the architecture is strongly dependent on. – user3666197 May 17 '16 at 12:19
  • @user3666197 : Sorry for my lack of precisions. The question should be complete now. – Hippolyte Barraud May 17 '16 at 12:30

3 Answers3

1

ZeroMQ : is capable of serving this task well within scales given above
nanomsg : is capable of serving this task too, a need to cross-check ports/bindings for clients

Design review:

  • client instances are not persistent, may freely appear on their own, may freely disappear on their own or on error
  • client instance decides on it's own, what it is about to PUB-lish as a message payload
  • client instance decides on it's own, what it is about to SUB-scribe to as an actual incoming stream of messages TOPIC-filter
  • client instance exchanges ( sends ), on it's own, a plain, non-multipart, JSON-formatted messages it has prepared / produced
  • client instance collects ( receives ) messages for which it assumes to be in the same, non-multipart, JSON-formatted shape and for which an attempt to get 'em locally-processed will take place after a receive is complete
  • maximum # of client-instances is not exceeding a low number of hundreds
  • maximum size of any JSON-formatted payload is less than 32 kB, about a 4 kB on average
  • maximum latency acceptable on an E2E process-to-process delivery across a common LAN-collision domain is under 5,000 [usec]
  • server instance is a central-role and a persistent entity
  • server instance provides a known transport-class URL-target for all late-joiners'.connect()-s

Proposal:

server may deploy multiple behaviours to meet the given goals, using both the PUB and SUB behaviours, and provides a code-driven, fast, SUB-side attached, non-blocking event-loop .poll() with aligned re-transmission of any of it's SUB-side .recv()-ed payloads to it's PUB-side, currently .connect()-ed, audience ( live client instances ):

set s_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
and s_PUB_send = aZmqCONTEXT.socket( zmq.PUB );

for performance reasons, that are not so tough here, one may also segregate workload-streams' processing by mapping each one on disjunct sub-sets of the multiple created I/O-threads:

map s_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
and s_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );

set s_PUB_send.bind( "tcp://localhost:8899" );
+
set s_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // forever *every*-TOPIC
set s_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
set s_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // retain just the last msg
set s_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
set s_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );

and s_SUB_recv.bind( "tcp://localhost:8888" ); // [PUB]s .connect()


Similarly,
client instance may deploy a reverse-facing tandem of both a PUB-endpoint and SUB-endpoint, ready to .connect() to a known transport-target-URL.

The client specific subscription locally decides, what is to get filtered from the incoming stream of messages ( prior to ZeroMQ v.3.1 API the plentitude of all messages will get delivered to each client instance over the transport class, however since API v.3.1+, the topic-filter is being operated on the PUB-side, which in the desired modus-operandi eliminates the wasted volumes of data over the network, but at the same time, this increases the PUB-side processing overhead ( ref.: remarks on a principle of increased multi-I/O-threads mapping / performance boost above )

set c_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
and c_PUB_send = aZmqCONTEXT.socket( zmq.PUB );

unless the payload-assembly/processing overhead grows close to the permitted End-to-End latency threshold, there shall be no need to separate / segregate the ZeroMQ low-level I/O-threads here:
map c_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
and c_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );

set c_PUB_send.connect( "tcp://server:8888" ); // reverse .bind on [SUB]
+
set c_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // modified on-the-fly
set c_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
set c_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // take just last
set c_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
set c_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
and c_SUB_recv.connect( "tcp://server:8899" );


Discussion:

For hobby projects, there is not much more needed on the messaging-infrastructure, nevertheless for more serious domains, there are additional services both the server and client instances ought have some further formal-communication pattern behaviours added.
- r/KBD for a remote keyboard, with a CLI-alike ad-hoc inspection utilities
- KEEP_ALIVE transponders for allowing a system-wide state- / perf-monitoring
- SIG_EXIT handlers for allowing a system-wide / instance-specific SIG_EXITs
- distributed syslog service to allow to safely collect / store a non-blocking replica of log-records ( be it during debug phase or performance-tuninc phase or production-grade records-of-evidence collecting )

- Identity Management tools for audit-trails et al

- WhiteList/BlackList for adding robustness to the infrastructure to make it better immune to DoS-attack / poisoning erroneous NIC traffic-bursts et al

- Adaptive Node re-Discovery for smarter / ad-hoc infrastructure design and status monitoring or when multi-role / ( N + M )-shaded active hot-standby role-handover/takeover scenarios et al come onto the stage

Summary

A1: Yes, fully within ZeroMQ capabilities
A2: Yes, C++ code-samples in the ZeroMQ book / Guides available
A3: Ref.: A1, plus may like indepth remark in Martin SUSTRIK's post on "Differences between nanomsg and ZeroMQ"

Hope you will enjoy the powers of distributed processing, be it supported by ZeroMQ or nanomsg or both.

Only one's own imagination is the limit.

If interested in further details, one might love the book referred to in the The Best Next Step section of this post

Community
  • 1
  • 1
user3666197
  • 1
  • 6
  • 50
  • 92
  • 1
    Thanks for your in-depth answer ! You answered my question far beyond my expectations. After testing both libraries, I am leaning toward nanomsg for its simplicity and memory-friendliness. I will have no trouble adapting your answer for my need. – Hippolyte Barraud May 17 '16 at 21:19
0

Q1: - Can I implement such a behavior using ZeroMQ ?

Definitely yes; but probably not using the PUB/SUB sockets.

The way to do it using PUB/SUB is this: for every node in the system, you make one PUB socket and one SUB socket, and connect the single SUB socket to the PUB sockets of all other nodes and set your subscription filter accordingly. This is limited in its usefulness because (I think) you need to set the same filter for all your connections. Note that you definitely should NOT create more than one context in each node.

If your total number of nodes is low (e.g. 10-20 or fewer,) you can make one PUB socket and N-1 SUB sockets per node (still all in one context,) and connect each SUB socket to each of the PUB sockets of the other nodes.

If you have clear notions of client and server nodes, you can use the newer CLIENT/SERVER sockets (available in 4.2 or 4.1, I believe.) This will be more elegant and probably easier to manage, but you'll have to implement the content filtering ("channels") yourself; which can be pretty easy or a little involved, depending on what exactly you want to do.

Q2: - Is there any working sample demonstrating that (preferably in C++) ?

Not that I know of.

Q3: - If not, any suggestions for a library in C++ ?

I'd still suggest ZeroMQ, for its relatively light weight, simple and elegant interface, comprehensive functionality, and ability to work in and with many languages. There are a lot of socket combos to choose from. If worse comes to worst, you can always use PAIR sockets everywhere.

user3666197
  • 1
  • 6
  • 50
  • 92
yzt
  • 8,873
  • 1
  • 35
  • 44
  • With all due respect, what is a rationale for an assumption above not to use more than one **`Context()`** instance in a `ZeroMQ` application? Do you have any citation / source for this opinion? – user3666197 May 17 '16 at 03:31
  • Yes I know that ultimately, this solution always exists. But the idea was to implement this as a last resort, preferably using something more simple if possible. I would prefer to manage lots of sockets only on the server side. Leaving the clients with two sockets at max. – Hippolyte Barraud May 17 '16 at 11:01
0

nanomsg with BUS protocol, see http://nanomsg.org/documentation-zeromq.html

user3666197
  • 1
  • 6
  • 50
  • 92
Severin Pappadeux
  • 18,636
  • 3
  • 38
  • 64
  • Yes, Martin Sustrik has spinned-off another marvelous piece of messaging library in **`nanomsg`**, nevertheless the `NN_BUS` does not provide per-se AFAIK the requested service components asked above by OP ( topic-filtering, node (re)-discovery updates et al ) so design / architecture still heavily depends on details about latency thresholds + capacity estimates asked above to be yet clarified. – user3666197 May 17 '16 at 04:32
  • I took a look at **`nanomsg`**'s BUS protocol. While at first it seems to not provide exactly what I need, the author talks about a _"non-trivial, broker based, topologie"_ where there is a man in the middle transfering all messages to everybody else ([as explained here](http://250bpm.com/blog:17)). @user3666197 : Do you think it will do, considering the estimates i provided above ? – Hippolyte Barraud May 17 '16 at 10:36
  • @user3666197 that's true, quite some code should be written on top of **nanomsg**. But having BUS protocol should simplify things a lot in a OP scenario, I think. Would be pain-in-the-.. to do that on top of **0mq** – Severin Pappadeux May 17 '16 at 16:55
  • @HippolyteBarraud yes, **nanomsg** with BUS is not exactly fits the bill, you'll need more code. But out of three messaging libs I dealt with (**MPI**, **0mq**, **nanomsg**) last one seems to have at least decent foundation for what you need. Maybe other messaging libs are better suited for the task on hand (**RabbitMQ**? have no idea about it) – Severin Pappadeux May 17 '16 at 17:00
  • With full respect to your insights, my experience made me believe no **single-archetype design meets a real-world, production-grade ecosystem's requirements** ( even, or might be **the more** if you are being sold this fiction from naive PMO Project Managers & ad-hoc CXO Project Sponsors ). Countless examples apply. Btw, I appreciate a lot Martin's fresh & lovely blog posts on 250bpm. – user3666197 May 17 '16 at 17:04
  • No, @SeverinPappadeus, any Broker-based ( persistence motivated mastodont-in-porcelain ) solution is not the way forward. Funny to read an MPI in a context of a smart & lightweight messaging. MPI paved the way in scientific cluster's environments with rather naive-(mass)-distributed code-designs, not a resource of a 1st choice in an unstructured ad-hoc smart messaging. – user3666197 May 17 '16 at 17:09
  • 1
    @user3666197 `Funny to read an MPI in a context of a smart & lightweight messaging.` ;))). No, I wouldn't advice to use **MPI** for the OP task – Severin Pappadeux May 17 '16 at 17:32