3

I'm wondering if anyone has previously implemented a mailbox class for interthread communication using the POSIX library. For reference, I'm looking similar to mailboxes used in SystemVerilog: http://www.asic-world.com/systemverilog/sema_mail_events2.html

EDIT:

My attempt at a mailbox using STL queues, pthread conditions, and mutexes. It tries to copy the behavior of the SystemVerilog mailbox described in the link:

#include <cerrno>
#include <climits>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <queue>

#include <fcntl.h>
#include <pthread.h>
#include <sys/select.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

using namespace std;

class Mailbox{

    private:
        pthread_cond_t  msg_available;  // Message in the mailbox?
        pthread_mutex_t queue_mutex;    // Mutex for queue control

        queue<messageT> messages;       // Messages

    public:
        // Constructor
        Mailbox(void){
            msg_available = PTHREAD_COND_INITIALIZER;
            queue_mutex = PTHREAD_MUTEX_INITIALIZER;
        }
        // Destructor
        ~Mailbox(void){
            // Nothing to do here
        }

        // Put a single message into the mailbox
        void put(messageT msg){

            // Lock down queue
            if(pthread_mutex_lock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Lock Error\n");
                exit(EXIT_FAILURE);
            }

            // Push message into mailbox
            messages.push(msg);

            // Signal there is a message in the mailbox
            if(pthread_cond_signal(&msg_available)){                    
                fprintf(stderr, "cond error");
                exit(EXIT_FAILURE);
            }

            // Unlock queue
            if(pthread_mutex_unlock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Unlock Error\n");
                exit(EXIT_FAILURE);
            }
        }

        // Try to put a single message into the mailbox
        int try_put(messageT msg){

            // Try to lock down queue
            if(pthread_mutex_trylock(queue_mutex) == 0){

                // Push message into mailbox
                messages.push(msg);

                // Signal there is a message in the mailbox
                if(pthread_cond_signal(&msg_available)){                    
                    fprintf(stderr, "cond error");
                    exit(EXIT_FAILURE);
                }

                // Unlock queue
                if(pthread_mutex_unlock(queue_mutex)){
                    fprintf(stderr, "Queue Mutex Unlock Error\n");
                    exit(EXIT_FAILURE);
                }

                return 1;
            }
            // Otherwise, say mailbox is unavailable
            else
                return 0;
        }

        //  Get single message from a mailbox
        void get(mesageT *msg){

            // Lock down queue
            if(pthread_mutex_lock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Lock Error\n");
                exit(EXIT_FAILURE);
            }

            // Wait for a message to come in
            while(messages.empty()){
                // Release hold of the lock until another thread
                // signals that a message has been placed
                if(pthread_cond_wait(&msg_available,&queue_mutex)){                 
                    fprintf(stderr, "cond_wait error");
                    exit(EXIT_FAILURE);
                }
            }

            // Pop of least recent message
            *msg = messages.front();
            messages.pop();

            // Unlock queue
            if(pthread_mutex_unlock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Unlock Error\n");
                exit(EXIT_FAILURE);
            }

        }

        //  Try to get single message from a mailbox
        int try_get(mesageT *msg){

            int mailbox_ready = 1;  // Mailbox ready

            // Lock down queue
            if(pthread_mutex_lock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Lock Error\n");
                exit(EXIT_FAILURE);
            }

            // Indicate if mailbox is empty
            if(messages.empty())
                mailbox_ready = 0
            // Otherwise, grab the message
            else {
                // Pop of least recent message
                *msg = messages.front();
                messages.pop();
            }

            // Unlock queue
            if(pthread_mutex_unlock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Unlock Error\n");
                exit(EXIT_FAILURE);
            }

            return mailbox_ready;
        }

        //  Peek at single message from a mailbox
        void peek(mesageT *msg){

            // Lock down queue
            if(pthread_mutex_lock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Lock Error\n");
                exit(EXIT_FAILURE);
            }

            // Wait for a message to come in
            while(messages.empty()){
                // Release hold of the lock until another thread
                // signals that a message has been placed
                if(pthread_cond_wait(&msg_available,&queue_mutex)){                 
                    fprintf(stderr, "cond_wait error");
                    exit(EXIT_FAILURE);
                }
            }

            // Peek at most recent message
            *msg = messages.front();

            // Unlock queue
            if(pthread_mutex_unlock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Unlock Error\n");
                exit(EXIT_FAILURE);
            }

        }

        //  Try to peek at single message from a mailbox
        int try_peek(mesageT *msg){

            int mailbox_ready = 1;  // Mailbox ready

            // Lock down queue
            if(pthread_mutex_lock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Lock Error\n");
                exit(EXIT_FAILURE);
            }

            if(messages.empty())    // Indicate if mailbox is empty
                mailbox_ready = 0
            else                    // Otherwise, grab the message
                *msg = messages.front();

            // Unlock queue
            if(pthread_mutex_unlock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Unlock Error\n");
                exit(EXIT_FAILURE);
            }

            return mailbox_ready;
        }
}
Deanie
  • 2,316
  • 2
  • 19
  • 35
sj755
  • 3,944
  • 14
  • 59
  • 79

2 Answers2

3

A simple, semaphore protected, queue should suffice.

If you want to be able to put different kind of data in the "mailbox", then use a common base structure, that can easily be extended and contains a single integer saying what kind of structure it is, and then typecast it to the correct structure (depending on the embedded type) when you need to use it.

Some programmer dude
  • 400,186
  • 35
  • 402
  • 621
1

If the threads are all in the same process you should use pthread_mutex and pthread_condition_variable, not semaphore. Unix semaphores allow inter-process synchronization, but they are less efficient within a process, and also have semantics that are much harder to reason about than mutexes and condition variables.

Here are a number of implementations with mutexes and condition variables in C or C++:

Community
  • 1
  • 1
Wandering Logic
  • 3,323
  • 1
  • 20
  • 25
  • 1
    ?? A semaphore and a mutex, (unbounded), or two semaphores and a mutex, (bounded), are the classic means of implementing mailboxes. You think that a semaphore count that reflects the queue item count, (and perhaps free space count), is hard to reason? – Martin James Jun 01 '13 at 19:39
  • 1
    No I don't think a semaphore count that reflects the queue count is particularly hard to reason about. I think that because semaphores have more states and fewer inviants than condition variables it is harder for automated tools (and thus programmers) to reason about programmer intent, and thus correctness. Neither Helgrind nor Intel Inspector are able to do accurate deadlock detection or race detection in the presence of semaphores. They are accurate in the presence of mutexes and condition variables. _There is a reason that pthreads and the C++11 cmtes chose not to include semaphores._ – Wandering Logic Jun 01 '13 at 20:00
  • @WanderingLogic Based on your comments, I'm going to try and implement a mailbox using mutexes and conditional variable to implement a mailbox internally containing an STL queue. I'll update my question with my mailbox class later on to get thoughts and opinions on it. – sj755 Jun 01 '13 at 20:21
  • @WanderingLogic I've added my own implementation of a mailbox. Tell me what you think. What I'm wondering about using conditions is whether or not all conditional signals generated by `pthread_cond_signal` are delivered separately when calling `pthread_cond_wait` – sj755 Jun 01 '13 at 21:16
  • 1
    the `pthread_cond_wait` calls need to be wrapped in a loop: `while (messages.empty()) pthread_cond_wait(&msg_available,&queue_mutex);` `pthread_cond_signal` is telling the waiting thread that it should retest `messages.empty()`. If there is no thread waiting, then the `pthread_cond_signals` (correctly) get dropped on the floor. – Wandering Logic Jun 01 '13 at 21:45
  • 1
    also: you have two copies of `get()` and `try_get()` (the ones towards the top are a little better than the ones lower down.) And `put()` needs to call `pthread_cond_signal()`. And `try_put()` is supposed to be testing for the condition that the queue is full (which can't reasonbly happen in this case), not that the mutex is lockable (the mutex will always be acquired after at most a short wait). – Wandering Logic Jun 01 '13 at 21:52
  • @WanderingLogic I appreciate the help, I've updated my code. The duplicate functions were the result me copying the `get` and `try_get` function and removing `messages.pop()` to make them peek functions. I decided to change the `try_put` function to fail if it can't get hold of the mutex immediately. For what I'm doing, I'm not actually going to use the function. I suppose this should work, but it may need some testing. – sj755 Jun 01 '13 at 22:07