1

I want to split jobs among multiple std::thread workers and continue once they are all done. To do so, I implemented a thread pool class mainly based on this SO answer. I noticed, however, that my benchmarks can get stuck, running forever, without any errors thrown.

I wrote a minimal reproducing code, enclosed at the end. Based on terminal output, the issue seems to occur when the jobs are being queued. I checked videos (1, 2), documentation (3) and blog posts (4). I tried replacing the type of the locks, using atomics. I could not find the underlying cause.

Here is the snippet to replicate the issue. The program repeatedly counts the odd elements in the test vector.

#include <atomic>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

class Pool {
  public:
    const int worker_count;
    bool to_terminate = false;
    std::atomic<int> unfinished_tasks = 0;
    std::mutex mutex;
    std::condition_variable condition;
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> jobs;

    void thread_loop()
    {
        while (true) {
            std::function<void()> job;
            {
                std::unique_lock<std::mutex> lock(mutex);
                condition.wait(lock, [&] { return (!jobs.empty()) || to_terminate; });

                if (to_terminate)
                    return;

                job = jobs.front();
                jobs.pop();
            }
            job();
            unfinished_tasks -= 1;
        }
    }

  public:
    Pool(int size) : worker_count(size)
    {
        if (size < 0)
            throw std::invalid_argument("Worker count needs to be a positive integer");

        for (int i = 0; i < worker_count; ++i)
            threads.push_back(std::thread(&Pool::thread_loop, this));
    };

    ~Pool()
    {
        {
            std::unique_lock lock(mutex);
            to_terminate = true;
        }
        condition.notify_all();
        for (auto &thread : threads)
            thread.join();
        threads.clear();
    };

    void queue_job(const std::function<void()> &job)
    {
        {
            std::unique_lock<std::mutex> lock(mutex);
            jobs.push(job);
            unfinished_tasks += 1;
            // std::cout << unfinished_tasks;
        }
        condition.notify_one();
    }

    void wait()
    {
        while (unfinished_tasks) {
            ; // spinlock
        };
    }
};

int main()
{
    constexpr int worker_count = 8;
    constexpr int vector_size = 1 << 10;
    Pool pool = Pool(worker_count);

    std::vector<int> test_vector;
    test_vector.reserve(vector_size);
    for (int i = 0; i < vector_size; ++i)
        test_vector.push_back(i);

    std::vector<int> worker_odd_counts(worker_count, 0);

    std::function<void(int)> worker_task = [&](int thread_id) {
        int chunk_size = vector_size / (worker_count) + 1;
        int my_start = thread_id * chunk_size;
        int my_end = std::min(my_start + chunk_size, vector_size);

        int local_odd_count = 0;
        for (int ii = my_start; ii < my_end; ++ii)
            if (test_vector[ii] % 2 != 0)
                ++local_odd_count;

        worker_odd_counts[thread_id] = local_odd_count;
    };

    for (int iteration = 0;; ++iteration) {
        std::cout << "Jobs.." << std::flush;
        for (int i = 0; i < worker_count; ++i)
            pool.queue_job([&worker_task, i] { worker_task(i); });
        std::cout << "..queued. " << std::flush;

        pool.wait();

        int odd_count = 0;
        for (auto elem : worker_odd_counts)
            odd_count += elem;

        std::cout << "Iter:" << iteration << ". Odd:" << odd_count << '\n';
    }
}

Here is the terminal output of one specific run:

[...]
Jobs....queued. Iter:2994. Odd:512
Jobs....queued. Iter:2995. Odd:512
Jobs..

Edit: The error occurres using GCC 12.2.0 x86_64-w64-mingw32 on Windows 10 with AMD Ryzen 4750U CPU. I do not get past 15k iterations . Using Visual Studio Community 2022, I got past 1.5M iterations (and stopped it myself). Thanks @IgorTandetnik for pointing out the latter.

Yakk - Adam Nevraumont
  • 262,606
  • 27
  • 330
  • 524
Dudly01
  • 444
  • 3
  • 13
  • 1
    For what it's worth, I don't see anything wrong, and I can't reproduce. MSVC 2019, code exactly as shown runs without an issue. 300K iterations and counting. – Igor Tandetnik Jan 29 '23 at 01:07

2 Answers2

1

Mingw doesn’t natively support multithreading on Windows. They supporting threads in their C++ standard library over the POSIX API, and winpthreads compatibility layer which implements that API on top of the Windows OS threads.

I think your error is not in the C++ code, but in the computer setup. Do the following.

  1. Use the compiler from x86_64-12.2.0-release-posix-seh-ucrt-rt_v10-rev2.7z archive, there.

  2. Don’t forget the binary built that way depends on a bunch of DLL files provided by the compiler: libgcc_s_seh-1.dll, libwinpthread-1.dll and libstdc++-6.dll. You must use exactly the same version of these DLL which were shipped with mingw. If you have some other versions of these DLLs anywhere in your %PATH%, expect all kinds of fails.

Couple general notes.

Linux-first C++ compilers like gcc have issues on Windows. A path of least resistance is using Visual C++ instead. If you want your software to build on other platforms as well, consider cmake to abstract away the compiler.

Windows already includes a thread pool implementation, since Vista. The API is easy to use, you only need 4 functions: CreateThreadpoolWork, SubmitThreadpoolWork, WaitForThreadpoolWorkCallbacks, and CloseThreadpoolWork. Example.

Soonts
  • 20,079
  • 9
  • 57
  • 130
  • I am using CMake already for the reason you recommended it. Therefore I want to avoid the Windows thread pool implementation. I have been using GCC only because I experience it to be faster than MSVC. I installed GCC from mingw-w64 using MYSYS2, following the VS Code documentation ["Using GCC with MinGW"](https://code.visualstudio.com/docs/cpp/config-mingw). – Dudly01 Jan 29 '23 at 17:13
  • @Dudly01 Well, before I wrote that answer, I have tested your code with the compiler I’ve linked, and it worked fine on my computer. I’ve built like that: `g++.exe -O3 -march=znver3 pool.cpp` If you have followed that installation guide carefully and you still have a broken build, this might mean that guide is wrong. The problem might be the guide, and/or that C++ addon for VS Code, and/or MSYS2, and/or pacman. You see, with vscode it’s just too many moving parts, hard to tell where’s the problem. – Soonts Jan 30 '23 at 12:34
  • @Dudly01 About “faster”, if you mean compilation time, use precompiled headers in VC++. If you mean performance of the code, use correct compiler and linker optimization options. It’s relatively tricky due to poor support from cmake, but possible to set up the correct options with `CMakeLists.txt` – Soonts Jan 30 '23 at 12:42
0

The first thing you should do is split the queue from the thread pool. They are both tricky enough, writing both of them comingled in one class is asking for trouble.

This also allows you to unit test the queue without the pool.

template<class Payload>
class MutexQueue {
public:
  std::optional<Payload> wait_and_pop();
  void push(Payload);
  void terminate_queue();
  bool queue_is_terminated() const;
private:
  mutable std::mutex m;
  std::condition_variable cv;
  std::deque<Payload> q;
  bool terminated = false;
  std::unique_lock<std::mutex> lock() const {
    return std::unique_lock<std::mutex>(m);
  }
};

this is a bit easier to write than the thread pool.

   void push(Payload p) {
     {
       auto l = lock();
       if (terminate) return;
       q.push_back(std::move(p));
     }
     cv.notify_one();
   }
  void terminate_queue() {
     {
       auto l = lock(); // YOU CANNOT SKIP THIS LOCK, even if terminate is atomic
       terminate = true;
       q.clear();
     }
     cv.notify_all();
  }
  bool queue_is_terminated() const {
    auto l = lock(); // if you make terminate atomic, you CAN skip this lock
    return terminate;
  }
  std::optional<Payload> wait_and_pop() {
    auto l = lock();
    cv.wait(l, [&]{ return terminate || !q.empty(); }
    if (terminate) return std::nullopt;
    auto r = std::move(q.front());
    q.pop_front();
    return std::move(r);
  }

there we go.

Now our thread pool is simpler.

struct ThreadPool {
  explicit ThreadPool(std::size_t n) {
    create_threads(n);
  }
  std::future<void> push_task(std::function<void()> f) {
    std::packaged_task<void()> p = std::move(f);
    auto r = p.get_future();
    q.push( std::move(p) );
    return r;
  }
  void terminate_pool() {
    q.terminate_queue();
    terminate_threads();
  }
  ~ThreadPool() {
    terminate_pool();
  }
private:
  MutexQueue<std::packaged_task<void()>> q;
  std::vector<std::thread> threads;
  void terminate_threads() {
    for(auto& thread:threads)
      thread.join();
    threads.clear();
  }
  static void thread_task( MutexQueue<std::packaged_task<void()>>* pq ) {
    if (!pq) return;
    while (auto task = pq->wait_and_pop()) {
      (*task)();
    }
  }
  void create_threads(std::size_t n) {
    for (std::size_t i = 0; i < n; ++i) {
      threads.push_back( std::thread( thread_task, &q ) );
    }
  }

I cannot spot an error in your code. But with the above, you can test a split of the queue from the pool.

The queue will work with pthreads or other primitives.

Yakk - Adam Nevraumont
  • 262,606
  • 27
  • 330
  • 524