1

I started using zeromq with python with the Publisher/Subscriber reference. However, I don't find any documentation about how to treat messages in the queue. I want to treat the last received message different as the rest of the elements of the queue.

Example

publisher.py

import zmq
import random
import time

port = "5556"
topic = "1"

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
    messagedata = random.randrange(1,215)
    print "%s %d" % (topic, messagedata)
    socket.send("%s %d" % (topic, messagedata))
    time.sleep(.2)

subscriber.py

import zmq

port = "5556"
topic = "1"

context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Connecting..."
socket.connect ("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE,topic)

while True:
    if isLastMessage(): # probably based on socket.recv()
         analysis_function() # time consuming function
    else:
         simple_function()  # something simple like print and save in memory

I just want to know how to create the isLastMessage() function described in the subscriber.py file. If there's something directly in zeromq or a workaround.

silgon
  • 6,890
  • 7
  • 46
  • 67

3 Answers3

2

Welcome to the world of non-blocking messaging / signalling

this is a cardinal feature for any serious distributed-system design.

If you assume a "last" message via a not having another one in the pipe, then a Poller() instance may help your main event-loops, where you may control the amount of time to "wait"-a-bit before considering the pipe "empty", not to devastate your IO-resources with zero-wait spinning-loops.

Explicit signalling is always better ( if you can design the remote end behaviour )

There is Zero-knowledge on the receiver-side, what is the context of the "last"-message received ( and explicit signalling is advised to be rather broadcast from the message sender-side ), however there is a reversed feature to this -- that instructs ZeroMQ archetypes to "internally"-throw away all such messages, that are not the "last"-message, thus reducing the receiver-side processing to right the "last"-message available.

aQuoteStreamMESSAGE.setsockopt( zmq.CONFLATE, 1 )

If you may like to read more on ZeroMQ patterns and anti-patterns, do not miss Pieter HINTJENS' fabulous book "Code Connected, Volume 1" ( also in pdf ) and may like a broader view on using principally a non-blocking ZeroMQ approach

user3666197
  • 1
  • 6
  • 50
  • 92
  • I'm really starting with all this interprocess communication stuff. I've been trying to play around even with shared memory and semaphores lately. Anyway, I'll check out the `CONFLATE` flag, as well as some flags that I still don't know. – silgon Aug 11 '17 at 10:00
  • 1
    Definitely worth to read the PDF-book, before diving into any code. Zero-sharing, Zero-blocking, (almost)-Zero-latency are maxims you will valuate a lot in later weeks, if understood why. For the details on {default- *(not always safe )* | hi-perf- | secure- }-configuration details, the best is to read the native API ( best in ver 3.x, not the most recent one ). There you start realise many additional powers, not mentioned in the book or elsewhere. **But definitely go for the book**. Worth your time, sweat, blood and tears, if you are into a serious distributes-computing. G/L! – user3666197 Aug 11 '17 at 11:52
  • I was just checking `CONFLATE` only keeps the last message. However in my case I want to save the information of the past messages, to be processed all together with the last message. Else I would lose information. Vote up in any case for your interesting answer and also the reference to your other answer. I'll check out the book =) – silgon Aug 11 '17 at 17:07
  • Right on-spot, the `CONFLATE` option was named and labeled explicitly as *"a reversed feature to this"*, as it makes sense for some other use-case scenarios, as being an almost anti-pattern to your one. Reasons are obvious and such settings help save a lot in stability-bothered tight real-time control-loops, where "over-sampled" inputs do not create any better results, but can devastate computing-resources work-flows stability within a given R/T-control theory envelopes. – user3666197 Aug 11 '17 at 17:13
1

If isLastMessage() is meant to identify the last message within the stream of messages produced by publisher.py, than this is impossible since there is no last message. publisher.py produces an infinite amount of messages!

However, if publisher.py knows its last "real" message, i.e. no while True:, it could send a "I am done" message afterwards. Identifying that in subscriber.py is trivial.

Ralf Stubner
  • 26,263
  • 3
  • 40
  • 75
0

Sorry, I will keep the question for reference. I just found the answer, in the documentation there is a NOBLOCK flag that you can add to the receiver. With this the recv command doesn't block. A simple workaround, extracted from a part of an answer, is the following:

while True:
    try:
        #check for a message, this will not block
        message = socket.recv(flags=zmq.NOBLOCK)

        #a message has been received
        print "Message received:", message

    except zmq.Again as e:
        print "No message received yet"

As for the real implementation, one is not sure that it is the last call you use the flag NOBLOCK and once you have entered the exception block. Wich translates to something like the following:

msg = subscribe(in_socket)
is_last = False
while True:
    if is_last:
        msg = subscribe(in_socket)
        is_last = False
    else:
        try:
            old_msg = msg
            msg = subscribe(in_socket,flags=zmq.NOBLOCK)
            # if new message was received, then process the old message
            process_not_last(old_msg)
        except zmq.Again as e:
            process_last(msg)
            is_last = True  # it is probably the last message
silgon
  • 6,890
  • 7
  • 46
  • 67