0

I am fairly new to C++ and very new to using mutex. I am trying to implement a thread safe queue by @ChewOnThis_Trident from this answer.

Essentially I have different threads adding messages to a queue and I need to preserve the order they are being added. However the messages require some conditional modifications before being added. In the real code listeners on separate threads call unique "handleMessage" functions that modify the message before adding to them to the queue. A separate thread checks to see if messages are in the queue and handles them in order. In the full code, I know the listeners are receiving the messages in the correct order, but they are failing to add them to the queue in the correct order.

I think the problem is there is some time elapsing between when a message is received and if it is being modified, causing messages to fall out of order.

For practical reasons in the real code, I can't do these modifications inside of "Safequeue::enqueue".

In my example two threads can add to the queue. One thread reads from it. The "message" in this case is a random int. "UsesQ" handles adding to the queue, and message modification (Ex. makes all ints odd).

I think another mutex is needed when "UsesQ::addQ" is called, but it would need to be shared across all the threads and I'm not sure if I am not sure how to implement it.

In the example I am struggling of thinking of a way to test if the order is correct.

Here is the example:

#include <queue>
#include <mutex>
#include <condition_variable>
#include <stdio.h>      
#include <stdlib.h>  
#include <iostream>   
#include <assert.h>
#include <pthread.h>
#include <unistd.h>


class SafeQueue
{// A threadsafe-queue.
public:
  SafeQueue(void)
    : q()
    , m()
    , cv()
  {}

  ~SafeQueue(void)
  {}


  // Add an element to the queue.
  void enqueue(int i)
  {  
    std::lock_guard<std::mutex> lock(m);
    q.push(i);
    cv.notify_one();
  }

  // Get the "front"-element.
  // If the queue is empty, wait till a element is avaiable.
  int dequeue(void)
  {
    std::unique_lock<std::mutex> lock(m);
    while(q.empty())
    {
      // release lock as long as the wait and reaquire it afterwards.
      cv.wait(lock);
    }
    int val = q.front();
    q.pop();
    return val;
  }

private:
  std::queue<int> q;
  mutable std::mutex m;
  std::condition_variable cv;
 
};


class UsesQ
{
    private:
    int readVal;
    int lastReadVal = 1;

    public:
    SafeQueue & Q;
    UsesQ(SafeQueue & Q): Q(Q){};
    ~UsesQ(){};
    void addQ(int i)
    {
      if(i% 2 == 0)
      {
        i++;//some conditional modification to the initial "message"
      }
      Q.enqueue(i);
    }
    void removeQ()
    {
      readVal = Q.dequeue();
    }
};

void* run_add(void* Ptr)
{
    UsesQ * UsesQPtr = (UsesQ *)Ptr;
    for(;;)
    {
    int i = rand();//simulate an incoming "message"
        UsesQPtr->addQ(i);
    }
    pthread_exit (NULL);
    return NULL;
}

void* run_remove(void* Ptr)
{
    UsesQ * UsesQPtr = (UsesQ *)Ptr;
    for(;;)
    {
        UsesQPtr->removeQ();
    }
    pthread_exit (NULL);
    return NULL;
}

int main()
{

  SafeQueue Q;

  UsesQ * UsesQPtr = new UsesQ(std::ref(Q));

    pthread_t thread1;
    pthread_create(&thread1, NULL, run_add, UsesQPtr); 

  pthread_t thread2;
    pthread_create(&thread2, NULL, run_add, UsesQPtr); 

  pthread_t thread3;
    pthread_create(&thread3, NULL, run_remove, UsesQPtr); 

  while(1)
  {
    usleep(1);
    printf(".\n");
  }

};

Complied with the pthread tag

g++ main.cpp -pthread

Thank you for any help.

Nick Merrill
  • 104
  • 9
  • Dare I ask why you're using raw pthreads in the first place? Your platform supports `std::condition_variable` and `std::mutex`, so if `std::thread` isn't available I'd be pretty surprised, and curious what platform+toolchain you're using. Regardless, fyi, there are `wait` overloads that allow you to specify a wait-predicate as an argument, thereby alleviating the while-loops you're `dequeue` operation. – WhozCraig Nov 23 '20 at 00:11
  • I could use std::thread, I am just new and more familiar with pthread. And yes the whole loop is equivalent to c.wait( lock, [&]{ return !q.empty(); } ); – Nick Merrill Nov 23 '20 at 00:28

0 Answers0