2

I am working on ROS environment, and trying to read CANBUS on a parallel thread. I initialized canbus in main thread because I want to make sure that CAN cables are connected. By initializing, I mean setsockopt(), ioctl(), bind() to configure the socket.

void readCanbus(int soktId) {
    while(true)
        int nbytes = read(soktId, ...);
}

int main() {
    int soktId;
    someSocketSetupFn(soktId);

    std::thread t(readCanbus, soktId);

    t.join();
}

Problem: If there is no incoming CAN messages, read() is blocked. Ctrl+C doesn't terminate the C++11 Program.

How can I make the read() terminate and so the whole program?

Terminate thread c++11 blocked on read This post proposed a solution for POSIX. I am working on ubuntu16.04.

Deduplicator
  • 44,692
  • 7
  • 66
  • 118
Jai
  • 377
  • 2
  • 14
  • 2
    If you set the socket to non-blocking mode, and block inside `select()` (or `poll()`, or similar) instead, you can use the self-pipe trick to cause select()/poll to wake up when your signal-handler sends a byte to its pipe, and then your thread can exit. – Jeremy Friesner Dec 19 '19 at 17:13
  • If the socket is set to non blocking, you can also determine if there is no incoming messages by using ioctl -- ioctl(socketfd, FIONREAD, &status); and if the status is > 0 there is data, otherwise there is none. – Frank Mancini Dec 19 '19 at 17:37
  • This is why we don't use blocking I/O.... – Lightness Races in Orbit Dec 19 '19 at 17:54
  • Good idea @Jeremy but please write answers in the appropriate place as this is not a chat room thanks – Lightness Races in Orbit Dec 19 '19 at 17:54
  • @JeremyFriesner. Thanks for your suggestion. If you don't mind can you share some code snippets.. – Jai Dec 19 '19 at 18:11
  • @FrankMancini That doesn't help you. There is no guarantee that a subsequent blocking read operation will not block. If you think there is such a guarantee, please show me the standard in which that guarantee can be found. ("I can't think of any way it can fail" is *not* a guarantee and there are documented cases where precisely this assumptions failed due to weird edge cases.) – David Schwartz Dec 19 '19 at 20:31
  • If this is a native TCP socket, shut it down for input. That will cause the read to return zero, indicating end of stream, which if the thread is written correctly should cause it to close the socket and exit. – user207421 Dec 20 '19 at 03:20

2 Answers2

2

If you want to mimic the interrupt behavior on the other threads, those threads have to allow interruption, and your signal handling thread has to deliver the signal to them. Consider the following snippet:

static volatile std::atomic<bool> quit;
static volatile std::deque<std::thread> all;
static volatile pthread_t main_thread;

void sigint_handler (int) {
    if (pthread_self() == main_thread) {
        write(2, "\rQuitting.\n", 11);
        quit = true;
        for (auto &t : all) pthread_kill(t.native_handle(), SIGINT);
    } else if (!quit) pthread_kill(main_thread, SIGINT);
}

Quitting is communicated by setting a global variable. A thread is to wakeup and check that variable. Waking up the thread is a matter of visiting the thread and sending it a signal.

In the event a worker thread intercepts SIGINT before the main thread does, then it sends it to the main thread to initiate a proper shutdown sequence.

In order to allow being interrupted, a thread may call siginterrupt().

void readCanbus(int s) {
    siginterrupt(SIGINT, 1);
    while(!quit) {
        char buf[256];
        int nbytes = read(s, buf, sizeof(buf));
    }
    write(2, "Quit.\n", 6);
}

We define two ways of installing a signal handler, one using signal, the other using sigaction, but using a flag that provides semantics similar to signal.

template <decltype(signal)>
void sighandler(int sig, sighandler_t handler) {
    signal(sig, handler);
}

template <decltype(sigaction)>
void sighandler(int sig, sighandler_t handler) {
    struct sigaction sa = {};
    sa.sa_handler = handler;
    sa.sa_flags = SA_RESTART;
    sigaction(sig, &sa, NULL);
}

The main thread installs the signal handler, initializes main_thread, and populates the container with the worker threads that need to be shutdown later.

int main () {
    int sp[2];
    main_thread = pthread_self();
    socketpair(AF_UNIX, SOCK_STREAM, 0, sp);
    all.push_back(std::thread(readCanbus, sp[0]));
    sighandler<sigaction>(SIGINT, sigint_handler);
    for (auto &t : all) t.join();
}
jxh
  • 69,070
  • 8
  • 110
  • 193
  • Signal handlers are not allowed to have side-effects. – user207421 Dec 20 '19 at 03:21
  • @user207421: That is a recommendation, I know of no such requirement. – jxh Dec 20 '19 at 03:57
  • It's a recommendation or requirement *for a reason:* that there is nothing to enforce read-consistency between wheat the signal handler did and what the main application sees. – user207421 Dec 20 '19 at 06:40
  • @user207421: There is no reason to forbid changing global state from an asynchronous callback. Care needs to be taken to not change state in a way that would violate consistency assumptions being made from the interrupted code. – jxh Dec 20 '19 at 07:31
  • @jxh, thanks for your answer. Can `sigaction()` be used instead of `signal()`? – Jai Dec 20 '19 at 14:46
  • @Jai Yes, `sigaction` could be used. – jxh Dec 20 '19 at 15:33
  • @jxh. I was wondering in case of `sigaction()`, what should be added inside `readcanbus()`? `siginterrupt(SIGINT, 1)` would still be there or something else? – Jai Dec 21 '19 at 07:49
  • @Jai No, there is nothing else the thread should do. But with `sigaction`, you have to set the `SA_RESTART` flag to enable BSD-like signal handler semantics. I have updated the answer. – jxh Dec 21 '19 at 21:47
1

Below is a small example of how to use the self-pipe trick to cause an I/O thread to gracefully exit when a CTRL-C is received. Note that for simplicity's sake, the I/O event loop in the example is done in the main() thread rather than in a separate thread, but the technique works regardless of which thread the event-loop is in -- the signal-handler callback writes a byte to one end of the pipe(), which causes the thread select()-ing on the other end of the pipe to return from select with the pipe's fd in the ready-to-read state. Once it detects that (via FD_ISSET()), the I/O event loop knows that it is time to exit.

#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/socket.h>

int _signalHandlerFD;

static void MySignalHandler(int sig)
{
   if (sig == SIGINT)
   {
      printf("Control-C/SIGINT detected!  Signaling main thread to shut down\n");
      char junk = 'x';
      if (write(_signalHandlerFD, &junk, sizeof(junk)) != sizeof(junk)) perror("send");
   }
}

/** Sets the given socket to blocking-mode (the default) or non-blocking mode
  * In order to make sure a given socket never blocks inside recv() or send(),
  * call SetSocketBlockingEnabled(fd, false)
  */
bool SetSocketBlockingEnabled(int fd, bool blocking)
{
   if (fd < 0) return false;

#ifdef _WIN32
   unsigned long mode = blocking ? 0 : 1;
   return (ioctlsocket(fd, FIONBIO, &mode) == 0) ? true : false;
#else
   int flags = fcntl(fd, F_GETFL, 0);
   if (flags == -1) return false;
   flags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
   return (fcntl(fd, F_SETFL, flags) == 0) ? true : false;
#endif
}

int main(int, char **)
{
   // Create a pipe that our signal-handler can use
   // to signal our I/O thread
   int pipefds[2];
   if (pipe(pipefds) != 0)
   {
      perror("pipe");
      exit(10);
   }
   _signalHandlerFD = pipefds[1];  // the "write" end of the pipe

   // Install our signal handler
   if (signal(SIGINT, MySignalHandler) != 0)
   {
      perror("signal");
      exit(10);
   }

   // Now we do our I/O event loop (a real program might
   // do this in a separate thread, but it can work anywhere
   // so for simplicity I'm doing it here)
   const int timeToQuitFD = pipefds[0];
   while(1)
   {
      fd_set readFDs;
      FD_ZERO(&readFDs);
      FD_SET(timeToQuitFD, &readFDs);

      int maxFD = timeToQuitFD;

      // If you have other sockets you want to read from,
      // call FD_SET(theSocket, &readFDS) on them here, and
      // update maxFD be to the maximum socket-fd value across
      // of all of the sockets you want select() to watch

      // select() will not return until at least one socket
      // specified by readFDs is ready-to-be-read-from.
      if (select(maxFD+1, &readFDs, NULL, NULL, NULL) >= 0)
      {
         if (FD_ISSET(timeToQuitFD, &readFDs))
         {
            printf("Signal handler told the main thread it's time to quit!\n");
            break;
         }

         // also call FD_ISSET() on any other sockets here, and
         // read() from them iff it returns true
      }
      else if (errno != EINTR)
      {
         perror("select()");
         break;
      }
   }
   printf("main thread exiting, bye!\n");

   return 0;
}
Jeremy Friesner
  • 70,199
  • 15
  • 131
  • 234