6

I'm using ZeroMQ in Python and C++ in many configurations and I wonder which is the most elegant way to abort a recv() or poll() from another thread (e.g. in case of controlled program termination but also if you want to stop listening without the need to kill the socket).

In contrast to this question I don't just want to avoid infinitive wait but I want to return immediately from recv() or poll().

I know I can just provide a timeout and abort recv() like this:

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

while _running:
    if poller.poll(timeout=100) == []:
        # maybe handle unwanted timout here..
        continue

    handle_message(socket.recv())

This will poll the socket endlessly until _running is set to False from another thread - after a maximum of 100 ms I'm done.

But this is not nice - I have a busy loop and it's hard this way to handle real timeouts which might be result of unwanted behavior. Also I have to wait for the timeout which is not critical in most cases but.. you know what I mean.

Of course I can poll an extra socket for abortion:

abort_socket = context.socket(zmq.SUB)
abort_socket.setsockopt(zmq.SUBSCRIBE, b"")
abort_socket.connect(<abort-publisher-endpoint>)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.register(abort_socket, zmq.POLLIN)

while _running:
    poll_result = poller.poll(timeout=1000)
    if socket in poll_result:
        handle_message(socket.recv())
    elif abort_socket in poll_result:
        break
    else:
        # handle real timeout here
        pass

But this approach also has disadvantages:

  • it's a bit verbose - at the place where I trigger the abort I would have to create a publisher and use that to abort the receiver
  • the abort_socket can only be used from one thread, so I would have to make this sure

So my question is: how is this done the nice way?

Can I somehow just use something like Python's threading.Event or s.th. similar in other languages rather than the abort-socket which can be passed to the poller like this?:

def listener_thread_fn(event)

    poller = zmq.Poller()
    poller.register(socket, zmq.POLLIN)
    poller.register(event, zmq.POLLIN)

    while _running:
        poll_result = poller.poll(timeout=1000)
        if socket in poll_result:
            handle_message(socket.recv())
        elif event in poll_result:
            break
        else:
            # handle real timeout here
            pass

So you just had to create a theading.Event() in the first place, pass it to listener_thread_fn and call event.set() from any thread to abort.

Community
  • 1
  • 1
frans
  • 8,868
  • 11
  • 58
  • 132
  • possible duplicate of [zeromq: how to prevent infinite wait?](http://stackoverflow.com/questions/7538988/zeromq-how-to-prevent-infinite-wait) - check out the last answer in particular which mentions ZMQ_LINGER, that should get you where you're going. – Jason Jun 26 '15 at 17:59
  • Doesn't look like a duplicate to me. – o11c Jun 26 '15 at 19:54
  • That's really not what I was asking. I want to abort a blocking `recv()` or `poll()` - that doesn't mean there are any lingering messages around. I don't want to `disconnect()` the socket from another thread because it's discouraged to access a zeromq-socket from a different thread than the one which created it. Both using another socktet to send an abort-signal to `poll()` or the timeout-approach would accomplish this but they both don't seem very elegant to me. – frans Jun 26 '15 at 20:10
  • 1
    @Jason: the accepted answer in the question you linked demonstrates the timeout-method which not a nice way to abort because you have to wait until the timeout expired. – frans Jun 26 '15 at 20:15
  • 2
    I don't know it for Python, but the doc states that [zmq_poll](http://api.zeromq.org/4-0:zmq-poll) returns with *EINTR* on the delivery of a signal. – Florian Jun 29 '15 at 10:39
  • 1
    That's true for Python, too, as you can see in Peque's answer. I don't like signals for controlling program flow since you never know who else installed signal handler (which results in unpredictable behavior) – frans Jul 20 '15 at 14:06

1 Answers1

1

With Python and pyzmq, an error is raised on recv() or poll() interruption; so you can simply catch the exception when it occurs. An example with recv():

while True:
    try:
        request = server.recv()
    except zmq.ZMQError as e:
        if e.errno == errno.EINTR:
            print('Clean exit now!')
            break
        raise

You can easily modify that code to use poll() instead (its the same procedure). Also, note that you'll need to:

import errno
Peque
  • 13,638
  • 11
  • 69
  • 105
  • The problem is that you can't send a signal to a thread - only to the whole process. In case you don't want to interrupt the whole process but only one poll it's difficult to make sure that nothing else happens when you send a signal. (also - when it comes to platform independence I would have a bad feeling). – frans Jul 20 '15 at 14:03
  • @frans: Sorry, I misunderstood your problem. I think if you removed the few references you have in this question about threads (the title and most paragraphs and first two code chunks could be unchanged), this question would still be very useful and will already have an answer. Then create a new question where you state your problem more precisely: i.e. note that you are using threads in the title, show a code chunk that uses threads and explain what you want to achieve (i.e.: from this process that has 3 threads, I would like to interrupt just one and make it break from the ZMQ polling). – Peque Jul 20 '15 at 17:23
  • @frans: if you don't like this, then I can delete my answer from here, but I would ask you to edit this question so that the fact that you need threads is more clear and to show what is each thread doing and how are you expecting to stop any of them. Will the threads be removed after the polling is aborted? etc. I think it's better to create a new one, as this one is a good question that might affect other users that don't use threads in their code. :-) – Peque Jul 20 '15 at 17:25
  • I think you can leave your answer since it at least shows one way to abort a `poll()` or `recv()` without the timeout as requested in the title (this is why I +1ed your answer) But I'd like to be able to be able to keep the process running, so I will just be more precise. – frans Jul 23 '15 at 07:03
  • @frans: I think my answer no longer makes sense in here now that the question has changed. I will probably delete it soon. If you want to create a new question to ask how to "Abort zeromq recv() or poll() instantly" (without threads), I'll be happy to answer in there if you share the link. ;-) – Peque Jul 23 '15 at 07:41
  • since you can send signals from another thread to the current process your answer still describes one way to do it. I think you should leave the answer - at least for completeness. – frans Jul 23 '15 at 08:01
  • 1
    @frans: ok. If you don't want to use signals, I would recommend you to use ZeroMQ patterns (i.e. add one socket to the poll). Although notice that this way you'll "interrupt" the `poll` or `recv` but you wont interrupt if its processing the message received (you'll need to wait until you get back to `poll`/`recv` again). If you are using threads within a process, take advantage of ZeroMQ's `inproc` transport (if you are not already doing so). It is amazingly fast. :-) – Peque Jul 23 '15 at 08:08