1

I have successfully implemented the thread pool from an answer on Stack Overflow, which helped me in speeding up my program. It uses a single std::queue to distribute jobs (std::function<void()>) among multiple workers (std::threads).

I wanted to improve on this. As I only need to run a limited set of functions, I planned to ditch the queue and to use variables instead. In other words, the n-th worker would do the n-th job from the std::vector<std::function<void()>>. Unfortunately, my test app crashes with Segmentation fault (core dumped) and I could not realize my mistake so far.

Here is my ~minimal reproducible code, with the job of counting the odd elements in a vector. (Idea taken from Scott Meyers: Cpu Caches and Why You Care.)

#include <algorithm>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <stdexcept> // std::invalid_argument
#include <thread>
#include <vector>

// Thread pool with a std::function for each worker.
class Pool {
  public:
    enum class Status {
        idle,
        working,
        terminate
    };

    const int worker_count;

    std::vector<Status> statuses;
    std::vector<std::mutex> mutexes;
    std::vector<std::condition_variable> conditions;
    std::vector<std::thread> threads;
    std::vector<std::function<void()>> jobs;

    void thread_loop(int thread_id)
    {
        std::puts("Thread started");
        auto &my_status = statuses[thread_id];
        auto &my_mutex = mutexes[thread_id];
        auto &my_condition = conditions[thread_id];
        auto &my_job = jobs[thread_id];

        while (true) {
            std::unique_lock<std::mutex> lock(my_mutex);
            my_condition.wait(lock, [this, &my_status] { return my_status != Status::idle; });

            if (my_status == Status::terminate)
                return;

            my_job();
            my_status = Status::idle;
            lock.unlock();
            my_condition.notify_one(); // Tell the main thread we are done
        }
    }

  public:
    Pool(int size) : worker_count(size), statuses(size, Status::idle), mutexes(size), conditions(size), threads(), jobs(size)
    {
        if (size < 0)
            throw std::invalid_argument("Worker count needs to be a positive integer");
    };
    ~Pool()
    {
        for (int i = 0; i < worker_count; ++i) {
            std::unique_lock lock(mutexes[i]);
            statuses[i] = Status::terminate;
            lock.unlock(); // Unlock before notifying
            conditions[i].notify_one();
        }

        for (auto &thread : threads)
            thread.join();
        threads.clear();
    };

    void start_threads()
    {
        threads.resize(worker_count);
        jobs.resize(worker_count);

        for (int i = 0; i < worker_count; ++i) {
            statuses[i] = Status::idle;
            jobs[i] = []() { std::puts("I am running"); };
            threads[i] = std::thread(&Pool::thread_loop, this, i);
        }
    }

    void set_and_start_job(const std::function<void(int)> &job)
    {
        for (int i = 0; i < worker_count; ++i) {
            std::unique_lock lock(mutexes[i]);
            jobs[i] = [&job, i]() { job(i); };
            statuses[i] = Status::working;
            lock.unlock();
            conditions[i].notify_one();
        }
    }

    void wait()
    {
        for (int i = 0; i < worker_count; ++i) {
            auto &my_status = statuses[i];
            std::unique_lock lock(mutexes[i]);
            conditions[i].wait(lock, [this, &my_status] { return my_status != Status::working; });
        }
    }
};

int main()
{
    constexpr int worker_count = 1;
    constexpr int vector_size = 1 << 10;

    std::vector<int> test_vector;
    test_vector.reserve(vector_size);
    for (int i = 0; i < vector_size; ++i)
        test_vector.push_back(i);

    std::vector<int> worker_odd_counts(worker_count, 0);

    const auto worker_task = [&](int thread_id) {
        int chunk_size = vector_size / (worker_count) + 1;
        int my_start = thread_id * chunk_size;
        int my_end = std::min(my_start + chunk_size, vector_size);

        int local_odd_count = 0;
        for (int ii = my_start; ii < my_end; ++ii)
            if (test_vector[ii] % 2 != 0)
                ++local_odd_count;

        worker_odd_counts[thread_id] = local_odd_count;
    };

    Pool pool = Pool(worker_count);
    pool.start_threads();
    pool.set_and_start_job(worker_task);
    pool.wait();

    int odd_count = 0;
    for (auto elem : worker_odd_counts)
        odd_count += elem;
    std::cout << odd_count << '\n';
}


Dudly01
  • 444
  • 3
  • 13
  • 1
    Excellent code example. Probably some fat that could be trimmed, but I was able [to do this](https://godbolt.org/z/743jbxs19) and turn on some extra runtime debugging without having to tweak a thing. Does the stack trace and error message help you solve the problem? – user4581301 Oct 27 '22 at 23:32
  • @user4581301, Nice, I was not aware of this tool. I may have found a solution! I wrote it in an answer. (I felt it is better than editing my question, but I may be wrong.) However, I do not see why this is actually a (possible) solution, so any insights in that area are welcome! – Dudly01 Oct 27 '22 at 23:41
  • You've done it right. Never edit the question to include the answer. Answers should only go into answers – user4581301 Oct 28 '22 at 00:19

2 Answers2

1

TL;DR version:

The simplest fix is to change

jobs[i] = [&job, i]() { job(i); };

to

jobs[i] = [job, i]() { job(i); }; 

This captures job by value and makes a copy. The copy won't go out of scope before the lambda does and the lambda will outlive the thread.

The Long version:

The problem is at

jobs[i] = [&job, i]() { job(i); };

in set_and_start_job. The object backing job goes out of scope before the threads get started, but how can this be if

pool.set_and_start_job(worker_task);

and worker_task won't go out of scope until after the the threads are joined?

Turns out that's because set_and_start_job requires a const std::function<void(int)> & and worker_task isn't a std::function, merely implicitly convertible to a std::function. This conversion makes a temporary variable with a lifespan bound to set_and_start_job's job parameter. When set_and_start_job exits, job goes out of scope and the temporary is destroyed.

The simple fix is above, but we can also make the conversion right at the source to that `std::function is passed all the way through the system and will go out of scope after the threads are joined.

const std::function<void(int)> worker_task = [&](int thread_id) { ... };

There may be some small resource saving in end-to-end std::function and capturing a reference, but my experiences with references and threads haven't been the best, so I'd prefer the copy to reduce the possibility that I've missed some subtlety or someone in the future will make a change that adds some.

user4581301
  • 33,082
  • 7
  • 33
  • 54
0

In the function Pool::set_and_start_job, when setting the job, removing the & from the job capture seems to have resolved the issue:

jobs[i] = [job, i]() { job(i); };

However, I just had the suspicion and does not know the underlying cause.

Dudly01
  • 444
  • 3
  • 13
  • Reason for failing: Capturing `job` by reference allows the object at being referred to to go out of scope before the thread gets a chance to run the job. If you capture by value, a copy is made and stored in the lambda, and the copy won't go out of scope until the lambda does. – user4581301 Oct 27 '22 at 23:45
  • Huh. I'm reading the code a bit wrong. The comment above is right-ish, but I join you in wondering how it got out of scope before the threads were joined. I need to take a deeper look. – user4581301 Oct 27 '22 at 23:48
  • The `pool.wait()` within `main()` makes sure (hopefully) that all workers finish their jobs. It blocks the main thread until so. Could the `worker_task` function that is created just above the `pool.wait()` go out of scope? I thought it should be kept alive until the program returns. – Dudly01 Oct 27 '22 at 23:50
  • 1
    OK: `worker_task` isn't a `std::function`, but it can be implicitly converted to one. To satisfy `const std::function &job`, we need a `std::function` so a temporary one that'll last to the end of `set_and_start_job ` is silently made out of `worker_task`. `set_and_start_job` returns, `job` dies, and the stored lambda that'll be run later by the thread holds a dangling reference. – user4581301 Oct 27 '22 at 23:51
  • You seem to be correct. If I create a `std::function worker_task_func` from my lambda in `main()`, then pass that to `set_and_start_job`, then the `&` can be added back to `job` and the code still runs. – Dudly01 Oct 27 '22 at 23:59
  • @user4581301 Please add an answer with your findings. I do not know how long I would have taken to find the underlying issue. Many thanks. – Dudly01 Oct 28 '22 at 00:00
  • Good point. This got trickier that I thought it was. Probably still going to be a duplicate, but it'll take a lot of reading between the lines for a future asker to make the connection between what happened to `job` and whatever happened in the duplicate. – user4581301 Oct 28 '22 at 00:19