2

I have a queue class, the data of which is stored in a vector:

std::vector<boost::shared_ptr<rxImage> > queue;

There is a thread which adds to a queue which is based on this loop:

while(runRxThread){
  this->rxImage();
}

Where rxImage() is defined by:

zmq::message_t img;
imageSocket->recv(&img);

//addToQueue is a push back:
//queue.push_back( boost::shared_ptr<rxImage> (new rxImage(data, imgSize)) );
localQueue->addToQueue((unsigned char*) img.data());

The images are received fine within this thread (I've tested with 10,000 or so and it seems to be fine).

The runRxThread variable is set through some setter functions within the class that the thread function is defined in.

When I run a process in the main thread such as:

startRx(); //start the thread

/*process to stimulate the sending of network data from another program*/

stopRX(); //stop the thread from accessing the queue

queue.clear();

There is a segfault caused by the clear(). I have checked it is definitely this line and not the internal plumbing of the objects, and it definitely is.

It appears to be a thread safety issue but I don't know how to fix it and, more importantly, I don't know why. My understanding is that two threads can write to the same memory, but not at the same time. Surely by setting my runRxThread variables, I ensure that this does not happen.

I would very much like a solution that does not involve mutexes or semaphores - I really don't think they should be necessary for a problem like this.

Thanks!

EDIT: runRXThread is volatile and the thread loop is now:

while(1){
    if(runRxThread == 1){
      this->rxImage();
    }
}

EDIT2: "use a mutex on shared objects"

OK, this is clearly a thread safety issue, I need to make my shared variables threadsafe. But...

1) rxImage(); does not terminate unless there is data being sent

2) The segfault happens within rxImage();

3) If I lock the queue with a mutex, surely the program will hang in rxImage until there is data, because the mutex will not be released

4) There will be no data sent, so the program will hang forever.

Is my understanding here incorrect?

EDIT3:

I have changed rxImage() to be non blocking:

zmq::message_t img;
imageSocket->recv(&img,ZMQ_NOBLOCK);
if((int)img.size() > 0){
    cout<<"in the thread conditional"<<endl;     
    localQueue->addToQueue((unsigned char*) img.data());
    cout<<"leaving thread conditional"<<endl;   
}

The problem earlier was apparently that localQueue was being written to when I was clearing the queue. Now, the queue can only be written in this function when there is data to write to it. I can guarantee that when I call the clear(), there is no data to write, ((int)img.size() > 0) returns false and the queue is not accessed by the thread. Why is there still a segfault? Surely this proves that this thread does not cause the segfault?

Here is a terminal output:

in the thread
pushing back1 of size: 16000000
Added image to queue. queue size: 650
leaving thread conditional

image server stopped
stopping image server
clearing vector
Segmentation fault

It can be seen that the thread is finished with the vector, then the image server is stopped, then the vector is cleared. Precisely in that order with no unpredicted behaviour. But there is still a segfault.

user2290362
  • 717
  • 2
  • 7
  • 21
  • 2
    Re. Edit: volatile is not threadsafe in C++. You need synchronization/atomic memory accesses – sehe Dec 17 '14 at 22:27
  • Yes, you need to cause imageSocket->recv(&img); to abort somehow. Usually communication apis have a Stop() or Abort() function that is thread safe and which you can call from stopRX(). That would cause recv to throw an exception or return an error code. When you get that error, return from rxImage(). – Scott Langham Dec 17 '14 at 23:16
  • Thank you, I have added a non blocking version of the function to stop hanging, please see edit three. – user2290362 Dec 17 '14 at 23:42

6 Answers6

5

Your data race is here:

while(runRxThread){
  this->rxImage();
}

You don't check runTxThread() for the duration of the loop (not to mention that unless runRxThread is marked volatile, it might not even be read from main memory, but "assumed" unchanged in a register.

(NOTE even with volatile the race is there I was merely pointing out the compiler assumes a single-threaded abstract machine, unless the explicit atomic memory ordering modes are employed)

You need mutual exclusion.

sehe
  • 374,641
  • 47
  • 450
  • 633
  • I don't believe volatile (in a C++11 conforming compiler) will help. Some mention of why is here: http://stackoverflow.com/questions/12878344/volatile-in-c11 – Scott Langham Dec 17 '14 at 22:20
  • @ScottLangham I never said it would help. I mentioned that, because it isn't volatile, there might not even be a read! – sehe Dec 17 '14 at 22:20
  • I have added the volatile flag and fixed the loop (see edit of post). This does not solve the problem. Why is the data race definitely there? Surely that thread should stop accessing the queue... runRxThread should act like a queue lock – user2290362 Dec 17 '14 at 22:23
  • 1
    @sehe Now you should see why it is a bad idea even to mention volatile when talking about thread-safe code. – Roman L Dec 17 '14 at 22:23
  • 1
    @RomanL not really. I could hardly claim a race condition where there might not even be one :) So I point out both the race condition and the fact that the compiler assumes a single threaded abstract machine – sehe Dec 17 '14 at 22:25
  • Could you please show an example of where to put the mutual exclusion? Which variable is being raced? The runRxThread or the queue? – user2290362 Dec 17 '14 at 22:28
  • @user2290362: as I said, any mutable data accessed from multiple threads is a race. You need to protect both runRxThread and the queue. It is easier with runRxThread as you can get away with an atomic. – Roman L Dec 17 '14 at 22:34
  • @user2290362 both are raced. Just mutually exclude all access to the queue. Here's a link to "random older answer" containing a thread/worker queue: http://stackoverflow.com/a/25928221/85371 – sehe Dec 17 '14 at 22:34
  • @ParkYoung-Bae I think I see far too little code by the OP to do this. Had there been a SSCCE, there would have been a fixed version (or two) 20 minutes ago. – sehe Dec 17 '14 at 22:36
  • I have added more information in an edit which maybe helps show my confusion as to how to implement this – user2290362 Dec 17 '14 at 23:13
  • I have fixed the problem - see my answer. As I stated, there is no data race. – user2290362 Dec 18 '14 at 16:09
  • 1
    @user2290362 you mean, your question was invalid and had to do with UB that was caused by other issues? I'm not very convinced the rest is all in order (remember, it's perfectly possibly to have mutliple sources of UB in the same program). But you never show any tangible evidence, so I'm happy to assume that "the threads are synchronised over the network" actually means something that you just failed to formulate. Cheers – sehe Dec 18 '14 at 22:09
4

When accessing mutable shared data from two threads, you need to protect against data races. It does not matter how simple your problem might look like, you cannot guarantee correctness of your code if it has a data race. A typical solution is using a mutex or the like to ensure that only one thread accesses the shared state at the same time. You wouldn't have to do this manually if the queue you are using was thread-safe (std::vector is obviously not).

Here is an example of a thread-safe queue, but it does not seem to have the clear() operation: http://www.boost.org/doc/libs/1_53_0/doc/html/boost/lockfree/queue.html. In fact, it is lock-free, so it does not use a mutex, but this does not mean it is simpler than a thread-safe queue with a mutex. It is actually the other way around - it is hard to write correct lock-free code.

Roman L
  • 3,006
  • 25
  • 37
  • lockfree is a whole different business than just threadsafe. Lock free is not generically applicable (unless for high throughput lowlatency, when you can afford the server load) – sehe Dec 17 '14 at 22:13
  • Did I say something that contradicts this statement? – Roman L Dec 17 '14 at 22:14
  • Yes. By pointing - exclusively - at a resource about lockfree queues to an obvious novice who will _not_ make this discernation – sehe Dec 17 '14 at 22:15
  • @sehe Lock-free implies thread-safe so you don't have to make the discernation. The only reason I pointed at a lockfree queue is because it comes up first when googling for a thread-safe queue and boost (which is a shame). – Roman L Dec 17 '14 at 22:19
3

The problem is that even if you set runRxThread to false, the thread may still be doing stuff within this->rxImage() and could be accessing the vector. You need to wait for it to finish doing that and check the loop condition again before allowing the main thread to clear the vector. It wouldn't be good to 'clear' the vector while a thread is still accessing it.

So, you need to wait for this->rxImage() to complete before allowing the main thread to clear the vector.

One solution would be to get StopRx() to wait for your "thread which adds to the queue" to complete by calling thread.join() after setting runRxThread false (assuming you're using a std::thread).

I'd suggest you also change runRxThread to type std::atomic to ensure both threads always have a consistent view of what it's value is.

Scott Langham
  • 58,735
  • 39
  • 131
  • 204
  • rxImage() won't complete unless there is data to be received, and the data comes from another program which is controlled by this program - so there is definitely no data to be received at the function will never terminate. Is there a way around this? – user2290362 Dec 17 '14 at 22:42
  • Usually you connection api (imageSocket in your case) will have a function available which can stop or abort any current communications operations. StopRx() should also call that function. Aborting would hopefully then cause imageSocket->recv(&img); to either return an error code or throw an exception. You need to look for that error and exit out of rxImage() when it occurs. – Scott Langham Dec 17 '14 at 22:45
3

My understanding is that two threads can write to the same memory, but not at the same time.

Unless you add explicit synchronisation to your code (e.g. using mutexes, semaphores, or atomic operations) you cannot meaningfully say whether two events happen "at the same time" or not. Without synchronisation you cannot say one even happens before the other.

I would very much like a solution that does not involve mutexes or semaphores - I really don't think they should be necessary for a problem like this.

You're wrong. You either need something like a mutex or something much more complicated like a lock-free queue using atomic operations.

Since you are not an expert in this area, just use a mutex to protect all shared data that you access from multiple threads (unless all accesses are only reads and there are no writes).

Jonathan Wakely
  • 166,810
  • 27
  • 341
  • 521
  • Well yes, if you just lock the mutex and wait it will block forever, so don't do that. You need to lock the mutex, check if there is data, then unlock, to give the other thread a chance to produce data. And instead of just spinning in a loop checking, you can use a condition variable to make the consumer wait until the producer notifies it there is data available. Find a tutorial, these are extremely common idioms, you should not be trying to write multithreaded code by guessing. – Jonathan Wakely Dec 17 '14 at 23:21
2

I am the OP, I have fixed the problem.

The issue is clearly not a thread contention issue, as suggested by other users. This is proven in edit 3 of the original question. The terminal output simulates where a mutex would have been locked and released, and proves that they are necessary in this case - as the threads are synchronised over the network. I accept that this is a very minority case.

I traced the problem back to the destructor of the image class which is being queued, a variable is deleted and this causes the segfault.

user2290362
  • 717
  • 2
  • 7
  • 21
0

In case of concurrency using Mutex locks are good option. You can use the locks in your class's push, pop methods.

When you are pushing, popping or accessing data Mutex locks will make sure the thread safety.

Let me demonstrate this -

Suppose you are accessing the queue while doing an operation, lets assume the queue size is 5. If you lock the block of that operation using the same Mutex lock that are being used while push, pop and other operations in queue. Then other operations in the queue will not be executed until the operation block has been executed.

For example see this simple pseudo code example

CQUEUE::Push(element)
{
    Mutex lock; // this is a pseudo code please ignore syntax error. You can find the exact syntax in any where through the web :P
    AddToQUEUE(element);

   //other operations under the same mutex will not be executed if the mutex lock variable is same
}

Please note that using Mutex locks may cause deadlocks if it is not being used in a proper way. So, please use Mutex with great care.

Naseef Chowdhury
  • 2,357
  • 3
  • 28
  • 52