2

For a custom server I intent to use the int zmq::poll( zmq_pollitemt_t * items, int nitems, long timeout = -1). function which is I think is a wrapper around the unix poll function but includes zmq::socket_t next to file descriptors. The function works as I expected until I press ctrl+x or run $kill my_server_pid in the terminal. I would expect that the poll to terminate with -1 or throws a zmq::error_t (which derives from std::exception) which includes a errno and the strerr message. This should indicate there was a interrupt. Then my server should handle the signal gracefully and save some data and shut down.

Below I have a fragment of code that demonstrates the problem. First I show a bit of my environment and how I compile it:

    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ g++ -v
    Using built-in specs.
    COLLECT_GCC=g++
    COLLECT_LTO_WRAPPER=/usr/lib/gcc/x86_64-linux-gnu/4.6/lto-wrapper
    Target: x86_64-linux-gnu
    Configured with: ../src/configure -v --with-pkgversion='Ubuntu/Linaro      4.6.3-1ubuntu5' --with-bugurl=file:///usr/share/doc/gcc-4.6/README.Bugs --enable-languages=c,c++,fortran,objc,obj-c++ --prefix=/usr --program-suffix=-4.6 --enable-shared --enable-linker-build-id --with-system-zlib --libexecdir=/usr/lib --without-included-gettext --enable-threads=posix --with-gxx-include-dir=/usr/include/c++/4.6 --libdir=/usr/lib --enable-nls --with-sysroot=/ --enable-clocale=gnu --enable-libstdcxx-debug --enable-libstdcxx-time=yes --enable-gnu-unique-object --enable-plugin --enable-objc-gc --disable-werror --with-arch-32=i686 --with-tune=generic --enable-checking=release --build=x86_64-linux-gnu --host=x86_64-linux-gnu --target=x86_64-linux-gnu
    Thread model: posix
    gcc version 4.6.3 (Ubuntu/Linaro 4.6.3-1ubuntu5) 
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ pkg-config --modversion `enter code here`libzmq
    2.2.0
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ g++ -pthread -std=c++0x -Wall -Wextra -pedantic -o poll polling.cpp $(pkg-config --cflags --libs libzmq )
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$

and now the code of polling.cpp :

#include <zmq.hpp>
#include <thread>
#include <cstdlib>
#include <string>
#include <iostream>
#include <signal.h>

const char* bind_addres = "tcp://*:2345";
const char* connect_addres = "tcp://localhost:2345";

inline void
send_str( zmq::socket_t& sock, const std::string& s) throw (zmq::error_t) {
    zmq::message_t msg ( s.size() );
    memcpy( msg.data(), s.c_str(), msg.size() );
    sock.send( msg );
}

inline void recv_str( zmq::socket_t& sock, std::string& s) throw( zmq::error_t) {
    zmq::message_t msg;
    sock.recv(&msg);
    s = std::string( static_cast<const char*>(msg.data()), msg.size());
}
static int interrupted = 0;

static void handle_signal ( int signal ) 
{
    interrupted = signal;
    std::cerr << "Interrupted by signal: " << signal << std::endl;
}

void catch_signals (void) 
{
    struct sigaction action;
    action.sa_handler = handle_signal;
    action.sa_flags = 0;
    sigemptyset(&action.sa_mask);

    sigaction(SIGINT,  &action, NULL);
    sigaction(SIGTERM, &action, NULL);
}

int get_interrupted(void)
{
    std::cout << "interrupted = " << interrupted << std::endl;
    return interrupted;
}

void req_thread ( zmq::context_t* c ){


    zmq::socket_t sock( *c, ZMQ_REQ);
    sock.connect(connect_addres);

    zmq::pollitem_t items[]{
        { sock, 0, ZMQ_POLLIN, 0}
    };

    while (true){
        try {
            // zmq 3.x.x takes ms instead of us so change to eg 1000 or be patient.
            int rc = zmq::poll(items, 1, 1000000);
            if (rc > 0){
                if ( items[0].revents & ZMQ_POLLIN){
                    std::string s;
                    recv_str(sock, s);
                    std::cout << s << std::endl;
                }
            }
            else if ( rc == 0){ //timeout
                send_str( sock, "Hello");
            }
            else{
                std::cout << __func__ << " " << __LINE__ << get_interrupted() << std::endl;
            }
        }
        catch( zmq::error_t& e ){
            std::cout << __func__ << " " << __LINE__ << e.what() << std::endl;
        }
    }
}

void rep_thread ( zmq::context_t* c ){

    zmq::socket_t sock( *c, ZMQ_REP);
    sock.bind(bind_addres);

    zmq::pollitem_t items[]{
        { sock, 0, ZMQ_POLLIN, 0}
    };

    while (true){
        try{
            int rc = zmq::poll(items, 1 , -1);
            if (rc > 0){
                if ( items[0].revents & ZMQ_POLLIN){
                    std::string s;
                    recv_str(sock, s);
                    s+=" world!";
                    send_str(sock, s);
                }
            }
            else{
                std::cout << __func__ << " " << __LINE__ << get_interrupted() << std::endl;
            }
        }
        catch( zmq::error_t& e ){
            std::cout << __func__ << " " << __LINE__ << e.what() << std::endl;
        }
    }
}

int main(){

    zmq::context_t context(1);
    catch_signals();

    std::thread t1 ( rep_thread, &context);
    std::thread t2 ( req_thread, &context);

    t1.join();
    t2.join();

    return 0;
}

and finally I show some example output that demonstrates my issue that the zmq_poll does not seem to be affected by pressing ctrl+c in the terminal:

    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ ./poll 
    Hello world!
    Hello world!
    Hello world!
    ^CInterrupted by signal: 2
    Hello world!
    Hello world!
    ^Z
    [1]+  Stopped                 ./poll
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ kill -9 %1

    [1]+  Stopped                 ./poll
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ 

So here one could see by the output on the terminal that no zmq::error_t is thrown neither zmq::poll() returns -1;

how it should work is in the next example simple_poll.cpp one can see that a zmq::error_t is thrown:

#include<zmq.hpp>
#include<signal.h>
#include<iostream>


static int interrupted = 0;

static void handle_signal ( int signal ) 
{
    interrupted = signal;
    /*just to show that the signal handler works*/
    std::cerr << "Interrupted by signal: " << signal << std::endl;
}

void catch_signals (void) 
{
    struct sigaction action;
    action.sa_handler = handle_signal;
    action.sa_flags = 0;
    sigemptyset(&action.sa_mask);

    sigaction(SIGINT,  &action, NULL);
    sigaction(SIGTERM, &action, NULL);
}

using namespace std;

int main(){

    zmq::context_t context(1);
    catch_signals();
    zmq::socket_t sock( context, ZMQ_REP );
    /*listen on port 2346 on all available interfaces.*/
    sock.bind("tcp://*:2346");

    zmq::pollitem_t items[] = {
        {sock, 0 , ZMQ_POLLIN, 0}
    };

    try {
        /*wait for a event*/
        zmq::poll( items, 1, -1);
        /*for zmq users read message and respond*/
    }
    catch (zmq::error_t& e){
        cout << "error occured: " <<e.what() << endl;
        cout << "We were interrupted by: " << interrupted << endl;
    }
    return 0;
}

This yields the following results on ctrl+x in the terminal showing that the zmq::error_t is caught and the signal has been handled.

mja@gijs:~/Desktop/sample_programs/zeromq/poll$ ./simple_poll ^CInterrupted by signal: 2
error occured: Interrupted system call
We were interrupted by: 2
mja@gijs:~/Desktop/sample_programs/zeromq/poll$
BenMorel
  • 34,448
  • 50
  • 182
  • 322
hetepeperfan
  • 4,292
  • 1
  • 29
  • 47
  • Was a solution found to the problem? I experience the same troubles in my code (ZMQ+multithreading+interrupts). I think I understand what is wrong, but maybe someone already know the proper way to code. – Alexander Apr 28 '15 at 10:18
  • Yes, I solved it. The problem is in the combination of asynchronous signal sending and race in calling poll() or another blocking function. For example the code fragment {while(running) { recv(....); }} will not be interrupted if a signal was delivered after a check for "running" but before recv() call. I can post my solution if needed. – Alexander Apr 28 '15 at 15:41
  • For solution of your problem please check: https://stackoverflow.com/questions/55508391/zeromq-handling-interrupt-in-multithreaded-application Main issue is when ZMQ is working in separate thread. You will get exception/signal interruption when ZMQ socket working in main thread. – Gelldur Dec 16 '22 at 07:01

1 Answers1

1

You've got a signal handler - but you don't do anything in it. In your handler, interrupt the polling loop (rather than true, check for some condition that you set in your signal handler.)

Let's say for argument's sake you are using c++11, try something like..

// Global which indicates that we are running..
std::atomic<bool> running = true;


// In your handler - reset this flag
static void handle_signal ( int signal ) {
  running = false;
}

Now your loops become:

while (running)
{
:
}
Nim
  • 33,299
  • 2
  • 62
  • 101
  • Would you mind elaborating on how to interrupt interrupt the polling loop, since this is the first time ever I'm catching signals. With the code above I can check on timeout that occurs in the function req_thread whether we were interrupted by calling get_interrupted() which should return the SIGINT or SIGTERM. But how can I "signal" to the zmq_poll function the interrupt that has occured? – hetepeperfan Feb 18 '13 at 11:55
  • I don't know if they provide an api function to interrupt the poll, in the worst case through, because it has a timeout of 1s, it will return and when poll times out, so before you do any other operations check the flag again and if not set, break from the loop. – Nim Feb 18 '13 at 12:41
  • Well I'm quite sure the zmq::poll() function should throw a zmq::error_t when the poll fails, because we were signalled. the cpp api of libzmq should call zmq_poll() from the C api, then it checks whether the result is >= 0 if and throws a zmq::error_t if the result is smaller than 0. zmq::error_t's constructor reads errno and makes zmq::error_t::what(void) return the strerr of errno. However, I'm using cout to print some stuff to std::cout and it's not printed to the terminal. This indicates the zmq::error_t is not thrown neither -1 as in unix poll() is returned. Thanks. Multi thread issue??? – hetepeperfan Feb 18 '13 at 13:10
  • Sure the api may return(or throw), however this all depends on whether there is a signal handler installed by the api. Yours overrides that, as a result the api has no chance of knowing that it has been interrupted. If you install your own handler (like you are doing) then you need to explicitly stop the poll as I described above.. – Nim Feb 18 '13 at 13:21
  • I'll post a example of how it should work below it will be single threaded. – hetepeperfan Feb 18 '13 at 14:01
  • Edited the original question I added an simple example where the zmq::error_t is thrown and handled appropriately, as shown in the output. – hetepeperfan Feb 18 '13 at 14:15