1

I have a multihreaded program whose 2 threads communicate with each other via a message queue. The first thread (sender) periodically sends a message, while the second thread (receiver) processes the information.

The sender has code similar to this:

// Create queue
key_t key = ftok("/tmp", 'B');
int msqid = msgget(key, 0664 | IPC_CREAT);

// Create message and send
struct request_msg req_msg;
req_msg.mtype = 1;
snprintf(req_msg.mtext, MSG_LENGTH, "Send this information");
msgsnd(msqid, &req_msg, strlen(req_msg.mtext) + 1, 0);

On the receiving thread, I do this:

// Subscribe to queue
key_t key = ftok("/tmp", 'B');
int msqid = msgget(key, 0664);

struct request_msg req_msg;

while(running)
{
    msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, 0);
    // Do sth with the message
}

As you can see, the receiver sits within a while loop that is controlled by a global variable named "running". Error handlers do set the boolean to false, if an error is encountered within the process. This works in most cases, but if an error occurs before being able to send a message to the queue, the receiver will not exit the while loop because it waits for a message before continuing and thus, checking the running variable. That means it will hang there forever, as the sender will not send anything for the rest of the runtime.

I would like to avoid this, but I do not know how to let msgrcv know that it cannot expect any more messages. I was unable to learn how msgrcv behaves if I kill the queue, assuming this is the easiest version. Maybe timeouts or sending some kind of termination message (possibly using the mtype member of the message struct) are also possible.

Please, let me know what the most robust solution to this problem is. Thanks!

EDIT: based on suggestions I have reworked the code to make the signal handlers action atomic.

#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>


#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0

struct message 
{
    uint64_t iteration;
    char req_time[28];
};

static volatile bool running = true;
static volatile bool work = false;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
static struct message msg;

pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;


static void
termination_handler(int signum)
{
    running = false;
}


static void 
alarm_handler(int signum)
{
    work = true;
}


static void
write_msg(void)
{
    // Reset the alarm interval
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        raise(SIGTERM);
        return;
    }

    struct timeval current_time;
    gettimeofday(&current_time, NULL);
    printf("\nLoop count: %lu\n", loop_count);
    printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
                           (current_time.tv_usec - previous_time.tv_usec));
    previous_time = current_time;

    // format timeval struct
    char tmbuf[64];
    time_t nowtime = current_time.tv_sec;
    struct tm *nowtm = localtime(&nowtime);
    strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);

    // write values
    pthread_mutex_lock(&mutexmsg);
    msg.iteration = loop_count;
    snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
    pthread_cond_signal(&data_updated_cv);
    pthread_mutex_unlock(&mutexmsg);

    loop_count++;
}


static void* 
process_msg(void *args)
{
    while(1)
    {
        pthread_mutex_lock(&mutexmsg);

        printf("Waiting for condition\n");
        pthread_cond_wait(&data_updated_cv, &mutexmsg);
        printf("Condition fulfilled\n");

        if(!running)
        {
            break;
        }

        struct timeval process_time;
        gettimeofday(&process_time, NULL);

        char tmbuf[64];
        char buf[64];
        time_t nowtime = process_time.tv_sec;
        struct tm *nowtm = localtime(&nowtime);
        strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
        snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);

        // something that takes longer than the interval time
        // sleep(1);

        printf("[%s] Req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
        pthread_mutex_unlock(&mutexmsg);

    }

    pthread_exit(NULL);
}



int
main(int argc, char* argv[])
{
    pthread_t thread_id;
    pthread_attr_t attr;

    // for portability, set thread explicitly as joinable
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
    {
        perror("pthread_create");
        exit(1);
    }

    pthread_attr_destroy(&attr);

    // signal handling setup
    struct sigaction t;
    t.sa_handler = termination_handler;
    sigemptyset(&t.sa_mask);
    t.sa_flags = 0;
    sigaction(SIGINT, &t, NULL);
    sigaction(SIGTERM, &t, NULL);

    struct sigaction a;
    a.sa_handler = alarm_handler;
    sigemptyset(&a.sa_mask);
    a.sa_flags = 0;
    sigaction(SIGALRM, &a, NULL);
    
    // Set the alarm interval
    alarm_interval.it_interval.tv_sec = 0;
    alarm_interval.it_interval.tv_usec = 0;
    alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
    alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;

    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    gettimeofday(&previous_time, NULL);

    while(1)
    {
        // suspending main thread until a signal is caught
        pause();

        if(!running)
        {
            // signal the worker thread to stop execution
            pthread_mutex_lock(&mutexmsg);
            pthread_cond_signal(&data_updated_cv);
            pthread_mutex_unlock(&mutexmsg);

            break;
        }

        if(work)
        {
            write_msg();
            work = false;
        }
    }

    // suspend thread until the worker thread joins back in
    pthread_join(thread_id, NULL);

    // reset the timer
    alarm_interval.it_value.tv_sec = 0;
    alarm_interval.it_value.tv_usec = 0;
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    printf("EXIT\n");
    pthread_exit(NULL);
    
}
DocDriven
  • 3,726
  • 6
  • 24
  • 53
  • You can tell `msgrcv()` to return immediately with an error if there are no messages pending. See the man page. – Shawn Nov 26 '20 at 22:44
  • And the newer POSIX message queues support timeouts in waiting for a message. – Shawn Nov 26 '20 at 22:45
  • @Shawn Thanks for the input. I do not want it to return immediately, if there are no messages. Reason being that the sender write to the message queue in one second intervals and the receiver has to be faster than that. So waiting until the next message arrives is desired behavior. I will definitely look into POSIX message queues, though. – DocDriven Nov 26 '20 at 23:05

4 Answers4

1

You have not justified the use of a message queue other than as a synchronization primitive. You could be passing the message via a variable and an atomic flag to indicate message readiness. This answer then describes how to implement thread suspension and resuming using a condition variable. That’s how it’d be typically done between threads, although of course is not the only way.

I do not know how to let msgrcv know that it cannot expect any more messages

No need for that. Just send a message that tells the thread to finish! The running variable doesn’t belong: you are trying to communicate with the other thread, so do it the way you chose to: message it!

Kuba hasn't forgotten Monica
  • 95,931
  • 16
  • 151
  • 313
  • Thanks for your reply. Could you please elaborate a bit more on the last part that states that the global running variable should be replaced by a termination message? If I understand correctly, a special error message should be sent to the queue in case of an error in either thread. The while loop within the receiver should then be always true, and I break out of it in case I receive the error message. Is that correct? – DocDriven Nov 27 '20 at 10:06
  • Also, another answer states to use the POSIX implementation of message queues instead which provide a timed receive function. Is this a more elegant way than using pthread in combination with an atomic flag? – DocDriven Nov 27 '20 at 10:08
1

I have spent the last day to read a lot about threading and mutexes and tried to get my example program to work. It does, but unfortunately, it gets stuck when I try to shut it down via Ctrl+C. Reason being (again) that this time, that the worker thread waits for a signal from the main thread that won't send a signal anymore.

@Rachid K. and @Unslander Monica: if you want to take a look again, is this more state of the art code for doing this? Also, I think I have to use pthread_cond_timedwait instead of pthread_cond_wait to avoid the termination deadlock. Could you tell me how to handle that exactly?

Note that the program does simply periodically (interval 1 s) hand a timestamp and a loop counter to the processing thread, that prints out the data. The output also shows when the print got called.

Thanks again!

#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>


#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0


static bool running = true;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;

struct message 
{
    uint64_t iteration;
    char req_time[28];
} msg;

pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;


static void
signal_handler(int signum)
{
    if (signum == SIGINT || signum == SIGTERM) 
    {
        running = false;
    }
}


static void
write_msg(int signum)
{
    if(!running)
    {
        return;
    }

    // Reset the alarm interval
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        raise(SIGTERM);
        return;
    }

    struct timeval current_time;
    gettimeofday(&current_time, NULL);
    printf("\nLoop count: %lu\n", loop_count);
    printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
                           (current_time.tv_usec - previous_time.tv_usec));
    previous_time = current_time;

    // format timeval struct
    char tmbuf[64];
    time_t nowtime = current_time.tv_sec;
    struct tm *nowtm = localtime(&nowtime);
    strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);

    // write values
    pthread_mutex_lock(&mutexmsg);
    msg.iteration = loop_count;
    snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
    pthread_cond_signal(&data_updated_cv);
    pthread_mutex_unlock(&mutexmsg);

    loop_count++;
}


static void* 
process_msg(void *args)
{
    while(running)
    {
        pthread_mutex_lock(&mutexmsg);

        printf("Waiting for condition\n");
        pthread_cond_wait(&data_updated_cv, &mutexmsg);
        printf("Condition fulfilled\n");
        struct timeval process_time;
        gettimeofday(&process_time, NULL);

        char tmbuf[64];
        char buf[64];
        time_t nowtime = process_time.tv_sec;
        struct tm *nowtm = localtime(&nowtime);
        strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
        snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);

        printf("[%s] Message req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
        pthread_mutex_unlock(&mutexmsg);

    }

    pthread_exit(NULL);
}


int
main(int argc, char* argv[])
{
    pthread_t thread_id;
    pthread_attr_t attr;

    // for portability, set thread explicitly as joinable
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
    {
        perror("pthread_create");
        exit(1);
    }

    pthread_attr_destroy(&attr);

    // signal handling setup
    struct sigaction s;
    s.sa_handler = signal_handler;
    sigemptyset(&s.sa_mask);
    s.sa_flags = 0;
    sigaction(SIGINT, &s, NULL);
    sigaction(SIGTERM, &s, NULL);

    struct sigaction a;
    a.sa_handler = write_msg;
    sigemptyset(&a.sa_mask);
    a.sa_flags = 0;
    sigaction(SIGALRM, &a, NULL);
    
    // Set the alarm interval
    alarm_interval.it_interval.tv_sec = 0;
    alarm_interval.it_interval.tv_usec = 0;
    alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
    alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;

    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    gettimeofday(&previous_time, NULL);

    // suspend thread until the worker thread joins back in
    pthread_join(thread_id, NULL);

    // reset the timer
    alarm_interval.it_value.tv_sec = 0;
    alarm_interval.it_value.tv_usec = 0;
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    pthread_exit(NULL);
    return 0;
}
DocDriven
  • 3,726
  • 6
  • 24
  • 53
  • 1
    You get stuck because the thread may be waiting on pthread_cond_wait() when the signal handler of CTRL-C is triggered. Just add a : "pthread_mutex_lock(&mutexmsg); pthread_cond_signal(&data_updated_cv); pthread_mutex_unlock(&mutexmsg);" in the signal handler of SIGINT right after the setting of "running" to false. – Rachid K. Nov 29 '20 at 14:25
  • 1
    To make it more secure and avoid some compiler optimization put the global "running" into registers inside the thread, you should define it as: static volatile bool running = true; – Rachid K. Nov 29 '20 at 14:27
  • 1
    You may also need to check "running" right after pthread_cond_wait() to see if the wake up is the timer or a SIGINT/SIGTERM. – Rachid K. Nov 29 '20 at 14:32
  • @Rachid K. That makes sense. But isn't it dangerous to put complex multi-line code like that into a signal handler? I got told that only atomic operations like setting variables is acceptable there, in case another signal is triggered which stops the signal handler from completing its task. – DocDriven Nov 29 '20 at 14:39
  • 1
    You are doing the same in the SIGALRM handler. Normally, it is preferable to centralize the events in an event loop (in the main program for example) which will signal the thread. For example, it the main program do: while (1) { pause(); // Make action corresponding to signal }, each time pause() returns, it means that a signal is received and then pthread_cond_signal() could be called or running be set to false from there. – Rachid K. Nov 29 '20 at 14:50
  • @RachidK. Sorry, I see the hypocracy of my statement now. I have reworked the code based on your suggestions into the original question. It seems to work now, thanks to the hints you gave me. After testing a bit, it seems to work. Nevertheless, I would like to hear your opinion on this now. The only thing that gets it to crash is in case the worker thread exceeds the interval time. Is this something to check for every iteration? – DocDriven Nov 29 '20 at 22:55
  • And with crash I mean it gets stuck, not actually crashing, as it waits again for a signal that will never come. – DocDriven Nov 29 '20 at 23:05
  • I updated my answer with some enhancements in your new proposal... – Rachid K. Nov 30 '20 at 08:56
1

On the receiving thread, I do this:

...

while(running)
{
    msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, 0);

Hopefully, in reality you do more than that.

Because you're not checking any error status in the code you've posted. And that's flat-out wrong for a blocking function call that is likely specified to never be restarted on receipt of a signal (as is true on Linux and Solaris). Per Linux `signal(2):

The following interfaces are never restarted after being interrupted by a signal handler, regardless of the use of SA_RESTART; they always fail with the error EINTR when interrupted by a signal handler:

  • ...
  • System V IPC interfaces: msgrcv(2), msgsnd(2), semop(2), and semtimedop(2).

and Solaris sigaction():

SA_RESTART

If set and the signal is caught, functions that are interrupted by the execution of this signal's handler are transparently restarted by the system, namely fcntl(2), ioctl(2), wait(3C), waitid(2), and the following functions on slow devices like terminals: getmsg() and getpmsg() (see getmsg(2)); putmsg() and putpmsg() (see putmsg(2)); pread(), read(), and readv() (see read(2)); pwrite(), write(), and writev() (see write(2)); recv(), recvfrom(), and recvmsg() (see recv(3SOCKET)); and send(), sendto(), and sendmsg() (see send(3SOCKET)). Otherwise, the function returns an EINTR error.

So your code need to look more like this in order to handle both errors and signal interrupts:

volatile sig_atomic_t running;

...

while(running)
{
    errno = 0;
    ssize_t result = msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, 0);
    if ( result == ( ssize_t ) -1 )
    {
        // if the call failed or no longer running
        // break the loop
        if ( ( errno != EINTR ) || !running )
        {
            break;
        }

        // the call was interrupted by a signal
        continue
    }

    ...
}

And that opens up the opportunity to use a alarm() and a SIGALRM signal handler to set running to 0 for use as a timeout:

volatile sig_atomic_t running;

void handler( int sig );
{
    running = 0;
}
...

struct sigaction sa;
memset( &sa, 0, sizeof( sa ) );
sa.sa_handler = handler;

sigaction( SIGALRM, &sa, NULL );
while(running)
{
    // 10-sec timeout
    alarm( 10 );

    errno = 0;
    ssize_t result = msgrcv( msqid, &req_msg, sizeof(req_msg.mtext), 0, 0 );
    
    // save errno as alarm() can munge it
    int saved_errno = errno;

    // clear alarm if it hasn't fired yet
    alarm( 0 );

    if ( result == ( ssize_t ) -1 )
    {
        // if the call failed or no longer running
        // break the loop
        if ( ( saved_errno != EINTR ) || !running )
        {
            break;
        }

        // the call was interrupted by a signal
        continue
    }

    ...
}

That can almost certainly be improved upon - the logic is rather complex to catch all the corner cases and there's likely a simpler way to do it.

Andrew Henle
  • 32,625
  • 3
  • 24
  • 56
0

Answer to the new proposal in the question

  • The cyclic timer should be rearmed in the event loop of the main thread for better visibility (subjective proposal);
  • When the secondary thread goes out of its loop, it must release the mutex otherwise the main thread will enter into a deadlock (waiting for the mutex which was locked by the terminated secondary thread).

So, here is the last proposal with the above fixes/enhancements:

#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>


#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0

struct message 
{
    uint64_t iteration;
    char req_time[28];
};

static volatile bool running = true;
static volatile bool work = false;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
static struct message msg;

pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;


static void
termination_handler(int signum)
{
    running = false;
}


static void 
alarm_handler(int signum)
{
    work = true;
}


static void
write_msg(void)
{

    struct timeval current_time;
    gettimeofday(&current_time, NULL);
    printf("\nLoop count: %lu\n", loop_count);
    printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
                           (current_time.tv_usec - previous_time.tv_usec));
    previous_time = current_time;

    // format timeval struct
    char tmbuf[64];
    time_t nowtime = current_time.tv_sec;
    struct tm *nowtm = localtime(&nowtime);
    strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);

    // write values
    pthread_mutex_lock(&mutexmsg);
    msg.iteration = loop_count;
    snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
    pthread_cond_signal(&data_updated_cv);
    pthread_mutex_unlock(&mutexmsg);

    loop_count++;
}


static void* 
process_msg(void *args)
{
    while(1)
    {
        pthread_mutex_lock(&mutexmsg);

        printf("Waiting for condition\n");
        pthread_cond_wait(&data_updated_cv, &mutexmsg);
        printf("Condition fulfilled\n");

        if(!running)
        {
            pthread_mutex_unlock(&mutexmsg); // <----- To avoid deadlock
            break;
        }

        struct timeval process_time;
        gettimeofday(&process_time, NULL);

        char tmbuf[64];
        char buf[64];
        time_t nowtime = process_time.tv_sec;
        struct tm *nowtm = localtime(&nowtime);
        strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
        snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);

        // something that takes longer than the interval time
        //sleep(2);

        printf("[%s] Req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
        pthread_mutex_unlock(&mutexmsg);

    }

    printf("Thread exiting...\n");
    pthread_exit(NULL);
}



int
main(int argc, char* argv[])
{
    pthread_t thread_id;
    pthread_attr_t attr;

    // for portability, set thread explicitly as joinable
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
    {
        perror("pthread_create");
        exit(1);
    }

    pthread_attr_destroy(&attr);

    // signal handling setup
    struct sigaction t;
    t.sa_handler = termination_handler;
    sigemptyset(&t.sa_mask);
    t.sa_flags = 0;
    sigaction(SIGINT, &t, NULL);
    sigaction(SIGTERM, &t, NULL);

    struct sigaction a;
    a.sa_handler = alarm_handler;
    sigemptyset(&a.sa_mask);
    a.sa_flags = 0;
    sigaction(SIGALRM, &a, NULL);
    
    // Set the alarm interval
    alarm_interval.it_interval.tv_sec = 0;
    alarm_interval.it_interval.tv_usec = 0;
    alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
    alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;

    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    gettimeofday(&previous_time, NULL);

    while(1)
    {
        // Reset the alarm interval <-------- Rearm the timer in the main loop
        if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
        {
          perror("setitimer");
          raise(SIGTERM);
          break;
        }

        // suspending main thread until a signal is caught
        pause();

        if(!running)
        {
            // signal the worker thread to stop execution
            pthread_mutex_lock(&mutexmsg);
            pthread_cond_signal(&data_updated_cv);
            pthread_mutex_unlock(&mutexmsg);

            break;
        }

        if(work)
        {
            write_msg();
            work = false;
        }
    }

    // suspend thread until the worker thread joins back in
    pthread_join(thread_id, NULL);

    // reset the timer
    alarm_interval.it_value.tv_sec = 0;
    alarm_interval.it_value.tv_usec = 0;
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    printf("EXIT\n");
    pthread_exit(NULL);
    
}

==================================================================

Answer to the original question

It is possible to use a conditional variable to wait for a signal from the senders. This makes the receiver wake up and check for messages in the message queue by passing IPC_NOWAIT in the flags parameter of msgrcv(). To end the communication, an "End of communication" message can be posted. It is also possible to use pthread_con_timedwait() to wake up periodically and check a "end of communication" or "end of receiver condition" (e.g. by checking your global "running" variable).

Receiver side:

// Mutex initialization
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// Condition variable initialization
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
[...]
while (1) {
  // Lock the mutex
  pthread_mutex_lock(&mutex);

  // Check for messages (non blocking thanks to IPC_NOWAIT)
  rc = msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, IPC_NOWAIT);
  if (rc == -1) {
    if (errno == ENOMSG) {

      // message queue empty

      // Wait for message notification
      pthread_cond_wait(&cond, &mutex); // <--- pthread_cond_timedwait() can be used to wake up and check for the end of communication or senders...

    } else {
      // Error
    }
  }

  // Handle the message, end of communication (e.g. "running" variable)...

  // Release the lock (so that the sender can post something in the queue)
  pthread_mutex_unlock(&mutex);
}

Sender side:

// Prepare the message
[...]
// Take the lock
pthread_mutex_lock(&mutex);

// Send the message
msgsnd(msqid, &req_msg, strlen(req_msg.mtext) + 1, 0);

// Wake up the receiver
pthread_cond_signal(&cond);

// Release the lock
pthread_mutex_unlock(&mutex);

N.B.: SYSV message queues are obsolete. It is preferable to use the brand new Posix services.

Rachid K.
  • 4,490
  • 3
  • 11
  • 30
  • Thanks for your reply. I have looked into the POSIX implementation and found that there is a function called `mq_timedreceive` which does something very similar to your proposed solution with `pthread_con_timedwait` if I understand this correctly. Which of these two solution is preferable? – DocDriven Nov 27 '20 at 09:57
  • All the solutions are OK. It depends on your needs (the way you want to stop/detect the termination of thez threads). Try to find the one which produces as less source code as possible :-) – Rachid K. Nov 27 '20 at 10:18
  • The timer based solutions generally triggers some polling (cyclic wake up to make several checks). This may also be less reactive as you need to wait for the timer to elapse before doing some actions. Signaling based solutions are more reactive. As soon as you want to notify some event to the receiver, the signal will wake it up. – Rachid K. Nov 27 '20 at 10:22