3

Graceful exit in ZeroMQ in multithreaded environment

Specs : ubuntu 16.04 with c++11,libzmq : 4.2.3

Sample code

static int s_interrupted = 0;
static void s_signal_handler (int signal_value)
{
    s_interrupted = 1;
    //some code which will tell main thread to exit
}

static void s_catch_signals (void)
{
    struct sigaction action;
    action.sa_handler = s_signal_handler;
    action.sa_flags = 0;
    sigemptyset (&action.sa_mask);
    sigaction (SIGINT, &action, NULL);
    sigaction (SIGTERM, &action, NULL);
}

static void Thread((zsock_t *pipe, void *)
{
    zmq::context_t context(1);
    zmq::socket_t requester1(context,ZMQ_DEALER);
    zmq::socket_t requester2(context,ZMQ_DEALER);
    requester1.connect(address1);
    requester2.connect(address2);
    zmq_pollitem_t items []=
        {{requester1,0,ZMQ_POLLIN,0},
        {requester2,0,ZMQ_POLLIN,0}};

    while(true)
    {
        zmq::message_t message;
        zmq::poll (items, 2, -1);

         if (items [0].revents & ZMQ_POLLIN) 
         {
             requester1.recv(&message);
         }
         if (items [1].revents & ZMQ_POLLIN) 
         {
             requester2.recv(&message);
         }
    }
}

int main()
{
    .
    //some code
    .

    zactor_t *actor = zactor_new (Threaded, nullptr);
    s_catch_signals();
    .
    //continue
    .
    //wait till thread finishes to exit

    return 0;
}

Now when the interrupt occurs it will call the signal handler from the main thread. I somehow need to tell the thread (poller) to exit from the signal handler. Any ideas how to achieve this?

Lightness Races in Orbit
  • 378,754
  • 76
  • 643
  • 1,055

2 Answers2

1

From ZMQ documentation you have 2 "idiomatic" way of dealing with this :

After testing it, seems that zmq::poll does not throw an exception on SIGINT. Therefore the solution seem to be to use a socket dedicated to closing. The solution looks like this :

#include <iostream>
#include <thread>
#include <signal.h>
#include <zmq.hpp>
zmq::context_t* ctx;
static void s_signal_handler (int signal_value)
{
    std::cout << "Signal received" << std::endl;
    zmq::socket_t stop_socket(*ctx, ZMQ_PAIR);
    stop_socket.connect("inproc://stop_address");
    zmq::message_t msg("0", 1);
    stop_socket.send(msg);
    std::cout << "end sighandler" << std::endl;
}

static void s_catch_signals (void)
{
    struct sigaction action;
    action.sa_handler = s_signal_handler;
    action.sa_flags = 0;
    sigemptyset (&action.sa_mask);
    sigaction (SIGINT, &action, NULL);
    sigaction (SIGTERM, &action, NULL);
}

void thread(void)
{
    std::cout << "Thread Begin" << std::endl;
    zmq::context_t context (1);
    ctx = &context;
    zmq::socket_t requester1(context,ZMQ_DEALER);
    zmq::socket_t requester2(context,ZMQ_DEALER);
    zmq::socket_t stop_socket(context, ZMQ_PAIR);
    requester1.connect("tcp://127.0.0.1:36483");
    requester2.connect("tcp://127.0.0.1:36483");
    stop_socket.bind("inproc://stop_address");

    zmq_pollitem_t items []=
    {
        {requester1,0,ZMQ_POLLIN,0},
        {requester2,0,ZMQ_POLLIN,0},
        {stop_socket,0,ZMQ_POLLIN,0}
    };

    while ( true )
    {
        //  Blocking read will throw on a signal

        int rc = 0;
        std::cout << "Polling" << std::endl;
        rc = zmq::poll (items, 3, -1);

        zmq::message_t message;
        if(rc > 0)
        {
            if (items [0].revents & ZMQ_POLLIN)
            {
                requester1.recv(&message);
            }
            if (items [1].revents & ZMQ_POLLIN)
            {
                requester2.recv(&message);
            }
            if(items [2].revents & ZMQ_POLLIN)
            {
                std::cout << "message stop received " << std::endl;
                break;
            }
        }
    }
    requester1.setsockopt(ZMQ_LINGER, 0);
    requester2.setsockopt(ZMQ_LINGER, 0);
    stop_socket.setsockopt(ZMQ_LINGER, 0);
    requester1.close();
    requester2.close();
    stop_socket.close();
    std::cout << "Thread end" << std::endl;
}

int main(void)
{
    std::cout << "Begin" << std::endl;
    s_catch_signals ();
    zmq::context_t context (1);
    zmq::socket_t router(context,ZMQ_ROUTER);
    router.bind("tcp://127.0.0.1:36483");

    std::thread t(&thread);
    t.join();
    std::cout << "end join" << std::endl;

}

Note that if you do not want to share the context to the signal handler you could use "ipc://..." .

Clonk
  • 2,025
  • 1
  • 11
  • 26
  • Note that if you do not want to share the context to the signal handler you could use "ipc://..." . This i dint understand – Nagaraju Sherigar Apr 04 '19 at 11:28
  • 1
    The string used as an adress in ZMQ begin with the protocol followed by the adress. For example`tcp://127.0.0.7:1234` for TCP protocol to ip adress 127.0.0.1 on port 1234. Alternatively you can use `ipc://name` for inter-process protocol or `inproc://anystring` for inter-thread protocol. When using inproc, both socket needs to have the same context. When using ipc, you don't need to use the same context. So if you don't want your signal handler to have access to the `zmq::context_t` then you can change `inproc://` by `ipc://`. – Clonk Apr 04 '19 at 11:39
  • Here are some link to the documentation for the different transport layer of ZMQ [INPROC](http://api.zeromq.org/2-1:zmq-inproc), [IPC](http://api.zeromq.org/2-1:zmq-ipc), [TCP](http://api.zeromq.org/2-1:zmq-tcp), [PGM](http://api.zeromq.org/2-1:zmq-pgm) – Clonk Apr 04 '19 at 11:46
0

If you wish to preserve the feel of ZMQ's Actor model in handling signals, you could use the signalfd interface on Linux: signalfd manpage. That way you could use zmq poll to wait for the signal to be delivered, instead of having a signal handler.

It has the added advantage that when handling a signal delivered through a file descriptor, you can call any function you like, because you're handling it synchronously, not asynchronously.

bazza
  • 7,580
  • 15
  • 22