-2

I am currently learning the basics about thread pooling. Here are some code blocks that I have written taking into account some examples found on the web:

SyncQueue.h

#ifndef SYNC_QUEUE_H
#define SYNC_QUEUE_H

#include <list>
#include <mutex>
#include <iostream>

template<typename T>
class SyncQueue {
public:
  SyncQueue();
  ~SyncQueue();
  SyncQueue(const SyncQueue&) = delete;
  SyncQueue& operator=(const SyncQueue &) = delete;
  void append(const T& data);
  T& get();
  unsigned long size();
  bool empty();
private:
  std::list<T> queue;
  std::mutex myMutex;
};
#endif

SyncQueue.cpp

#include "SyncQueue.h"

template<typename T>
SyncQueue<T>::SyncQueue():
  queue(),
  myMutex() {}

template<typename T>
SyncQueue<T>::~SyncQueue() {}

template<typename T>
void SyncQueue<T>::append(const T& data) {
  std::unique_lock<std::mutex> l(myMutex);
  queue.push_back(data);
}

template<typename T>
T& SyncQueue<T>::get() {
  std::unique_lock<std::mutex> l(myMutex);
  T& res = queue.front();
  queue.pop_front();
  return res;
}

template<typename T>
unsigned long SyncQueue<T>::size() {
  std::unique_lock<std::mutex> l(myMutex);
  return queue.size();
}

template<typename T>
bool SyncQueue<T>::empty() {
  std::unique_lock<std::mutex> l(myMutex);
  return queue.empty();
}

template class SyncQueue<std::function<void()>>;

ThreadPool.h

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <atomic>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
#include "SyncQueue.h"

class ThreadPool {
public:
  ThreadPool(unsigned long thrdAmount = 0);
  virtual ~ThreadPool();
  void appendTask(std::function<void()> func);
  unsigned long pendingTasks();
private:
  void runThread();
  unsigned int myThrdAmount;
  std::atomic<bool> done;
  SyncQueue<std::function<void()>> syncQueue;
  std::vector<std::thread> threads;
  std::condition_variable myCondVar;
  std::mutex myMutex;
};

#endif

ThreadPool.cpp

#include "ThreadPool.h"

ThreadPool::ThreadPool(unsigned long thrdAmount):
  myThrdAmount(0),
  done(false),
  syncQueue(),
  threads(),
  myCondVar(),
  myMutex() {
  if (thrdAmount > 0) {
    myThrdAmount = thrdAmount;
  } else {
    myThrdAmount = std::thread::hardware_concurrency();
  }
  for (unsigned int i = 0; i < myThrdAmount; i++) {
    threads.push_back(std::thread(&ThreadPool::runThread, this));
  }
}

ThreadPool::~ThreadPool() {
  done = true;
  myCondVar.notify_all();
  for (auto& thrd: threads) {
    if (thrd.joinable()) {
      thrd.join();
    }
  }
}

void ThreadPool::appendTask(std::function<void()> func) {
  syncQueue.append(func);
  {
    std::unique_lock<std::mutex> l(myMutex);
    myCondVar.notify_one();
  }
}

unsigned long ThreadPool::pendingTasks() {
  return syncQueue.size();
}

void ThreadPool::runThread() {
  while (!done) {
    if (syncQueue.empty()) {
      std::unique_lock<std::mutex> l(myMutex);
      myCondVar.wait(l);
      continue;
    }
    syncQueue.get()();
  }
}

main.cpp

#include <unistd.h>
#include <iostream>
#include "ThreadPool.h"

void print() {
  std::cout << "Hello World!" << std::endl;
}

int main(int argc, char const *argv[]) {
  ThreadPool p;
  for (int i = 0; i < 20; i++) {
    p.appendTask(print);
  }
  std::cout << "Pending: " << p.pendingTasks() << std::endl;
  sleep(5);
  for (int i = 0; i < 20; i++) {
    p.appendTask(print);
  }
  return 0;
}

Despite all the operations on a SyncQueue are locked by a mutex and the condition variable of the ThreadPool is also protected by a mutex, the code often results in undefined behaviours.

That said, can you please explain me where the code is lacking of thread safety? How should I improved it?

rudicangiotti
  • 566
  • 8
  • 20
  • 2
    Note: You don't use any of the extra features of `std::unique_lock`, so you could get by with `std::lock_guard`. – Jesper Juhl Mar 22 '19 at 19:00
  • 2
    your `get` function returns a reference to a destructed item. Thats never going to end well. – Mike Vine Mar 22 '19 at 19:02
  • 1
    Note: `template SyncQueue::~SyncQueue() {}` is a user defined destructor and, even though empty, is by definition then no longer a *trivial* destructor. Declare it `= default;` *in the header* instead (and remove the impletation) and it *will* be trivial. Same goes for your default constructor. – Jesper Juhl Mar 22 '19 at 19:06
  • 1
    Note: your `size`, `empty` & `pendingTasks` functions should be `const`. – Jesper Juhl Mar 22 '19 at 19:08

1 Answers1

1
 void ThreadPool::appendTask(std::function<void()> func) {
  syncQueue.append(func);
  {
    std::unique_lock<std::mutex> l(myMutex);
    myCondVar.notify_one();
  }
}

void ThreadPool::runThread() {
  while (!done) {
    if (syncQueue.empty()) {
      std::unique_lock<std::mutex> l(myMutex);
      myCondVar.wait(l);
      continue;
    }
    syncQueue.get()();
  }
}

The problem is that myMutex doesn't actually protect anything. So your code has a catstrophic race condition around waiting for the queue.

Consider:

  1. Thread calling runThread sees syncQueue is empty.
  2. Thread calling appendTask adds job to the queue and calls notify_one. There is no thread to notify.
  3. Thread calling runThread finally gets the lock on myMutex and waits on the condition variable, but the queue isn't empty.

It is absolutely vital that the condition variable you use for waiting be associated with the mutex that protects the predicate you are waiting for. The entire purpose of a condition variable is to allow you to atomically unlock the predicate and wait for a signal without a race condition. But you buried the predicate inside the syncQueue, defeating the condition variable's lock handling logic.

You can fix this race condition by making all calls into syncQueue under the protection of the myMutex mutex. But it might make a lot more sense to make syncQueue waitable. This may make it harder to shut down the thread pool though.

David Schwartz
  • 179,497
  • 17
  • 214
  • 278
  • What do you mean with `to make syncQueue waitable`? Do you mean that it could be better to use a condition variable as member inside the `SyncQueue` class? – rudicangiotti Mar 23 '19 at 00:21
  • @rudicangiotti Yes. Your code needs a waitable queue, that is, a queue that permits you to wait for things to be put on it. – David Schwartz Mar 23 '19 at 00:35
  • What about using more than one mutex? They should allow appending and getting tasks independently, without interrupting non-atomic operations, shouldn't they? [Here](https://codeshare.io/G7MxWL) is a little code draft. As you introduced, in this case the shutdown would be more tricky. – rudicangiotti Mar 23 '19 at 01:12
  • 1
    @rudicangiotti That looks better. Just fix, `if (queue.empty()) {` in `get`. You need a `while` there. Consider: 1) Two threads are waiting for jobs. 2) Two jobs get put on the queue. 3) Both threads are woken up. 4) First thread is scheduled, does both jobs. 5) Second thread finally gets to run, tries to get from an empty queue. 5) *boom* – David Schwartz Mar 23 '19 at 01:19
  • 1
    @rudicangiotti You might find [this answer](https://stackoverflow.com/a/29742586/721269) helpful. – David Schwartz Mar 23 '19 at 02:23