0

I'm unsure whether I should use a std::semaphore or std::condition variable to handle thread waiting in a threadpool system.

I think that there is a low risk of using the semaphore and incrementing it beyond its compiler specific ::max().

I'm using c++20

Some of the following code is somewhat pseudo-code, but illustrates the program structure.

class ThreadPool
{
private: 
     struct ThreadData {
     
     threadsafe_queue < fn2::unique_function<void() > > ThreadTask; //library that makes moveable function
     **std::std::counting_semaphore<1> ThreadSignal {0}; //This is what i'm unsure of**
     
     }


    std::vector<std::jthread> mThreads;
    std::vector<ThreadData> mThreadData;

    template < typename Func >
    void CreateTask(Func&& func) {

      //iterate over threads and distribute the tasks
      ...
      mThreadData[thread_id].ThreadTask.emplace_back(std::move(func));
      mThreadData[thread_id].ThreadSignal.release(); //tell the queue there is work to do

     }
public:
    //Functions that parse a function into the above CreateTask()
    ...

    ThreadPool() {
     //for the required number of threads create them with an internal lambda that handles the work
    ...
         mThreads.emplace_back(std::move(std::jthread([id = thread_id]() {
              while (running) {
                    **mThreadData[id].ThreadSignal.aquire(); //gets the go-signal, which is still the focus of this post**
                    //handle work, either by getting from internal queue or stealing other work
                    //thread might "over"-consume the work in general by stealing work, but if a thread runs through this part without any work to get it will eventually run into a semaphore block
              }
         }
     }
}

If for example the program allocates 1000 tasks to 2 threads. The main thread will quickly increase the signal semaphore to a very high number. I'm unsure if this is risking undefined behaviour? Should I implement a condition-variable instead? The semaphores should be faster than condition varibles in general, which is a reason to want to keep it. It's also really simple, since i can easily activate all theads by iterating over the semaphores and calling release on them all.

Previously I've had a simple this_thread::yield() call if no task gets pop'ed off the list, but it ends up just consuming a lot of cpu-time, which is the reason why I'd like the threads to "pause" and use a gateway of sorts.

Please bear with me, if I have misunderstood some concept, since I'm still a newbie.

Mikel F
  • 3,567
  • 1
  • 21
  • 33
Martin
  • 33
  • 5
  • @DanielLangr : In the above implementation the task gets distributed to a specific thread, and only the specific thread is "woken"/unblocked. The code doesn't do anything to the other threads, so i'm unsure what you mean? – Martin May 17 '23 at 13:04
  • @DanielLangr I'm still unsure what you mean. For each job/task, only one thread is "woken", all other threads will be blocked until they get asigned a task. The threadpool can have unlimited threads and works completely as intended by being able to run several tasks in parrelel. You can also "activate" all threads by iterating over their semaphores and releasing them. Which is a somewhat asyn'ed notify_all – Martin May 17 '23 at 13:10
  • @DanielLangr if each thread has their own queue and their own semaphore, then the above code does exactly that, yes. It's a flag either 0 or 1 – Martin May 17 '23 at 13:12
  • @DanielLangr No problem my friend. If you google the little book of semaphores you'll see that you can create a lot of different stuff with them :-) – Martin May 17 '23 at 13:15
  • The condition variable is the older one (since C++11), the semaphore comes with C++20. If you want to stay compatible with previous standards then use the former. Otherwise – well, simpler initialisation, no spurious wake-ups at least for `acquire` (according to [cppreference](https://en.cppreference.com/w/cpp/thread/counting_semaphore)) and if even more efficient I see no reason why not to prefer the semaphore. – Aconcagua May 17 '23 at 13:27
  • I'm not clear about what your signal semaphore should be or serve for, your code only shows ordinary binary semaphores (there's a `std::binary_semaphore` alias for, by the way), so cannot say anything about that. – Aconcagua May 17 '23 at 13:29
  • cppreference is not precise about what happens when the precondition (*'Both update >= 0 and update <= max() - counter are true, where counter is the value of the internal counter.'*) is not met, I assume this results in undefined behaviour – so you need to make sure not to release any of the binary semaphores that's already released... – Aconcagua May 17 '23 at 13:32
  • Wait a second, you might get along with one single semaphore: Start with one with initial counter as zero, release for every task you add (but not more than the semaphore's maximum, see above!) by `release`ing, then the threads can `acquire` as long as there are tasks left (i.e. the semaphore's counter being greater zero). – Aconcagua May 17 '23 at 13:44
  • There exists a `LeastMaxValue`, but apparently no `GreatestMaxValue` – so you might set the maximum to `std::numeric_limits::max()` and you shouldn't get any problems with, you'll most likely run out of memory long before you reach that one... – Aconcagua May 17 '23 at 13:47
  • With *any* approach: be aware that multiple threads might be running concurrently – and you might get a race condition between two threads trying to get a task from the queue – so you need to protect the queue *independently*! An ordinary `std::mutex` (can use it with `std::lock_guard` for RAII). – Aconcagua May 17 '23 at 13:52
  • *'This is what i'm unsure of'* – you mean initialising by zero? No problem according to [cppreference](https://en.cppreference.com/w/cpp/thread/counting_semaphore/counting_semaphore), the pre-conditions are met ;) – Aconcagua May 17 '23 at 14:07
  • @Aconcagua : Yeah, it seems that a single semaphore might do the trick. And btw the queue is threadsafe (mutex) Anyway I'm uncertain whether the actual max value of the semaphore (which i understand is determined at compiletime?) might get exceeded if i release too many times on the semaphores, before the actual aquires happen. That is defined as undefined behaviour. For this to happen say that the main thread (outside the threadpool) allocates 100.000 tasks to a threadpool of two. That means 50.000 releases on each threads semaphore potentially before any aquires. I just don't wanna risk UB. – Martin May 17 '23 at 14:29
  • Re, "iterate over threads and distribute the tasks." There's some gaps in your example, and I don't want to try to fill them in and guess your intent, but I wonder if you are maybe over-thinking this. In every thread pool that I've ever seen, there is a _blocking queue_ of tasks, and there are some number of worker threads that compete with each other to take tasks from the queue. There is no "scheduler," and there is no concept of "distributing" tasks to the workers. Instead, each worker simply takes a task whenever it is able, performs it, and then goes back to wait for another task. – Solomon Slow May 17 '23 at 17:33
  • @SolomonSlow Anthony Williams (c++ concurrency in action) gives examples of threadpools where each workerthread has their own queue and they steal from each other. It's to avoid contention in a main queue. The only think i try to optimize a bit on is how every thread in the pool handles idle-states, since std::this_thread::yield just seem to make each thread spin – Martin May 17 '23 at 18:04
  • @Martin, OK... I guess... I never was involved in development of any super-high performance data-center application. But, I know that sometimes, "spinning" is not considered to be a bad thing. If you've got a dedicated core for each worker thread, then they may be able to react to incoming tasks more quickly if "idle" threads actually are spinning. Spinning uses more electrical power, but in a big, back-end application, you may be willing to pay for that power (and, for the extra air conditioning) in order to achieve the fastest possible response times. – Solomon Slow May 17 '23 at 19:18
  • @Martin *'[max] might get exceeded'* – only if you choose maximum too low... I assume it doesn't make any difference internally if you choose a high or low value (unless for 1 perhaps...), so if you choose `std::numeric_limits::max()` (as proposed already) you'd be out of such danger. Or you select a maximum that's reasonable for you – and maybe check before adding to the queue. That maximum might indicate: *'Sorry, too busy, cannot accept further tasks!'* – as there are so many tasks accumulated you cannot cope with any more in reasonable time anyway... – Aconcagua May 19 '23 at 06:52

3 Answers3

2

With a thread-queue, the way to work is to let the worker threads wait on a condition variable that tells the condition that the queue is empty, so if a new message is put on the queue, a waiting worker can start and a done worker can continue as long as the queue is not empty. This way, only the queue must do the counting.

You can also protect the memory by adding a condition variable the tells that the queue is full and let the feeding thread wait as long as the queue is full.

stefaanv
  • 14,072
  • 2
  • 31
  • 53
  • Classic pre-C++20 approach... We might just get the same with a counting semaphore with simpler code and *possibly* (likely?) more efficient code. – Aconcagua May 17 '23 at 13:57
  • Pretty sure this is the best way to do it. It's important to watch out for: 1. spurious wake-ups, so you need to provide a predicate in the condition variable wait (or call it in a loop) 2. not keeping the threadpool's mutex lock on during the execution of the task from the queue, in case that task wants to enqueue more work (unless you have a recursive mutex) 3. always check your condition (is something in the queue?) before waiting – lionkor May 17 '23 at 14:47
  • @lionkor you think that condition variables are better? Above it got discussed using a single semaphore with a very high leastmaxvalue to just increment for each task and decrement for tasks completed - wouldn't that avoid every problem you list? – Martin May 17 '23 at 14:56
  • @lionkor this seems like the standard way of working with condition variables when working with `std::condition_variable::wait` – stefaanv May 17 '23 at 14:57
  • @Aconcagua you might want to make your own answer to show why semaphores (which exist already since the 70s in software) make for simpler code than condition variables in combination with a queue. c++'s [condition variable](https://en.cppreference.com/w/cpp/thread/condition_variable) handles a lot with the condition and the lock, so the code is pretty simple. – stefaanv May 17 '23 at 15:10
  • @stefaanv: From cppreference: "Semaphores are also often used for the semantics of signalling/notifying rather than mutual exclusion, by initializing the semaphore with ​0​ and thus blocking the receiver(s) that try to acquire(), until the notifier "signals" by invoking release(n). In this respect semaphores can be considered alternatives to std::condition_variables, often with better performance." – Martin May 17 '23 at 15:21
  • @Martin I know how semaphores work. I worked with them a lot until conditional variables became available which work more closely with the condition (empty queue) and for me has clearer code. By the way: https://stackoverflow.com/a/5538447/104774 is my answer from 12 years ago before std::condition_variable which makes it cleaner. – stefaanv May 17 '23 at 15:30
  • Well, you need a condition variable *together* with a mutex – that needs to be locked before waiting for the variable. That's the part that makes it more complicated. Semaphores existed before, indeed, but integrated as standard component only since C++20. The part maybe indeed a bit unfortunate is naming only, I need to *'release'* to what we'd actually call *'notify'* with condition variables... – Aconcagua May 19 '23 at 06:45
  • No spurious wake-ups either, at least if you do not `try_aquire`, so no lambda to be added to `wait` function. – Aconcagua May 19 '23 at 07:28
  • @Aconcagua You think adding a lambda to a function is complicated? It's used in almost half of the standard library algorithms (find_if, sort, any_of, transform...) and combining the mutex with the condition variable is actually very clear that it is about a common condition that is changed on one side and checked by the other, so it must be guarded by a mutex, which it should also be done with a semaphore unless the condition is so simple it can be achieved with std::atomic. In this case, the condition is an empty queue, so action on the queue should be protected. – stefaanv May 22 '23 at 06:58
  • It's not complicated – but you *have* to do, and inside you need a way to determine if it's a spurious wakeup or not, typically requiring yet another variable – and that one needs to be set appropriately at yet another place. You don't have to care for all that at all with the C++20 alternative... – Aconcagua May 22 '23 at 07:35
  • @Aconcagua Working with `std::condition_variable::wait`, spurious wake-ups are functionally a non-issue: they don't have to be checked. Before that, the only thing was that the condition had to checked in a while loop (this while loop is now in the wait function). The spurious wake-up can be mostly seen as more waiters receiving the signal than needed and only one need to act on it, it's not like it's happening all of the time. – stefaanv May 22 '23 at 07:53
  • Still a *correct* programme should handle it, otherwise we introduce a probability of failure, how small it ever might be... You won't get around – the semaphore is more convenient, that's it advantage, and maybe performance, too, according to cppreference. If *better* – or maybe less or even more *correct* – that's a matter of opinion or at least of how one defines correctness. I won't argue around that. – Aconcagua May 22 '23 at 08:08
  • @Aconcagua Have you actually worked with `std::condition_variable::wait`? There isn't a probability of failure, period. With the semaphore, there's still some common resources to protect in most cases, so you don't get around that. I'm not talking about performance because I don't know and I haven't worked with the C++20 semaphore specifically. – stefaanv May 23 '23 at 07:02
1

I propose an entirely generic thread pool implementation (adopted from comments to question and answers – and going beyond the scope of the question; in the sense of the question: I implemented the pool using a semaphore simply because it is slightly less complicated to use, e.g. no need for keeping an additional mutex locked before waiting or caring for spurious wakeups; have an eye for in the code… – the other approach is totally valid, too, though). It implements the thread pool as a template, providing most efficiency possible in respect to adding and removing tasks from the queue – though unfortunately this requires the generic Task base class which all potentially to be added tasks would inherit from to be defined outside of the template (its an implementation detail and normally should go inside as a nested class; C++ doesn't allow to predeclare nested classes, though...). A minor disadvantage, too, is that one cannot store arbitrary implementations in the same pointer type, but that's likely not relevant here anyway.

class Task
{
public:
    Task() { }
    virtual ~Task() { }
private:
    template <typename Queue>
    friend class ThreadPool;
    virtual void exexute() = 0;
};

template <typename Queue = std::queue<Task*>>
class ThreadPool
{
public:
    ThreadPool() : ThreadPool(std::thread::hardware_concurrency())
    { }

    ThreadPool(size_t numberOfThreads) 
        : m_notifier(0)
    {
        for(; numberOfThreads; --numberOfThreads)
        {
            m_threads.emplace_back(&ThreadPool::run, this, m_threads.size());
        }
    }

    ThreadPool(ThreadPool const&) = delete;
    ThreadPool& operator=(ThreadPool const&) = delete;

    ~ThreadPool()
    {
        m_exit = true;
        for(auto n = m_threads.size(); n; --n)
        {
            m_notifier.release();
        }
        for(auto& t : m_threads)
        {
            t.join();
        }
    }

    void append(Task* task)
    {
        if(task)
        {
            {
                std::lock_guard g(m_queueGuard);
                m_queue.push(task);
                // releasing as soon as possible; no need to keep
                // the mutex locked when releasing the semaphore
            } 
            m_notifier.release();
        }
    }

private:
    std::vector<std::thread> m_threads;
    Queue m_queue;
    std::mutex m_queueGuard;
#ifdef _MSC_VER
    std::counting_semaphore<std::numeric_limits<ptrdiff_t>::max()> m_notifier;
#else
    // oh [@*%#$+]!, neither gcc nor clang allow above (MSVC does, though)
    std::counting_semaphore<2147483647> m_notifier;
    // corresponds to INT_MAX here, but maybe that differs on other platforms...
    // apparently there's no standard way to get that, so we'd relay on compiler
    // specifics :(
#endif
    bool m_exit = false;

    void run()
    {
        for(;;)
        {
            m_notifier.acquire();
            if(m_exit)
            {
                break;
            }
            Task* task;
            {
                std::lock_guard g(m_queueGuard);
                task = static_cast<Task*>(m_queue.front());
                m_queue.pop();
            }
            delete task;
        }
        file.close();
    }
};

Recommendation: Place your own namespace around.

Usage then is simple: Implement any arbitrary task to be run by inheriting from the base Task class and add it to the pool:

class TheTask : public Task
{
public:
    TheTask()
        : m_taskId(++g_n)
    { }
private:
    inline static size_t g_n = 0;
    size_t m_taskId;

    void execute(std::ofstream& s)
    {
        s << m_taskId << std::endl;
        // without some duration indeed can get pretty unbalanced...
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

    }
};

int main()
{
    ThreadPool p(4);

    for(size_t i = 0; i < 32; ++i)
    {
        p.append(new TheTask());
    }

    std::this_thread::sleep_for(std::chrono::seconds(4));
    return 0;
}

Demonstration on godbolt – though it's not possible to access the output files created there, so probably better to translate on one's own...

Further possible extension: Maybe you want to allow locally allocated tasks, these, of course, must not get deleted automatically! You might want to add a flag to the Task base class (private member) that is set appropriately, e.g.:


void append(Task& task, bool deleteOnCompletion = false)
{
    task->m_delete = deleteOnCompletion;

    // append as before
}


void append(Task* task, bool deleteOnCompletion = true)
{
    if(task)
    {
        append(*task, deleteOnCompletion);
    }
}

void run()
{
    //...

    task->execute();
    if(task->m_delete)
    {
        delete task;
    }
}
Aconcagua
  • 24,880
  • 4
  • 34
  • 59
  • Thanks, I challenged for an implementation and you provided. I'm lazier, so I provide an example from reddit in the question: https://www.reddit.com/r/cpp_questions/comments/i1ckqj/are_conditional_variables_related_to_semaphores/. Notice that the producer is actually the same while the consumer is similar: your code depends very implicitly on the counter of the semaphore while the cv code checks the size of the queue. It still is a matter of taste. – stefaanv May 23 '23 at 09:49
  • @stefaanv Though need to admit that I stumbled heavily over the GCC/clang implementation – with the counter being limited to `INT_MAX` (on godbolt's architecture). In practice that should still be totally fine, but at least theoretically the risk of running into UB remains :( – Aconcagua May 23 '23 at 10:34
  • I saw that and I didn't comment on it because I regularly see some rough edges even if you want to improve your code. That max number is unfortunate and might improve for some next GCC/clang version or in the next standard. For production code, I would solve it by imposing a maximum on the queue with extra cv/semaphore so memory is also protected. – stefaanv May 23 '23 at 11:11
  • 1
    @stefaanv Well, and then the code isn't any less complicated any more! Question is, though, if such limit isn't meaningful *anyway*, no matter which approach we use – we'd discover earlier that we simply cannot handle all of the tasks in time any more. If not (in a specific scenario) then in production code I'd switch to the simpler solution, which under given circumstances would actually be your variant (unless there are still high performance requirements enforcing the semaphores, provided these are *really* more efficient, profiling yet to be done)... – Aconcagua May 23 '23 at 11:27
0

(as suggested by Aconcagua):

So by removing the semaphore from the ThreadData struct and adding a member variable std::counting_semaphore<> mPoolSemaphore{0}; to the threadpool (which has a really big ::max) and finally modifying the CreateTask() function to increase the above semaphore and making a loop (inside EACH thread) something like this:

while (!mDone) {
        mPoolSemaphore.aquire();        
                while (mPendingTasks.load(std::memory_order_acquire) > 0) {
                    while (auto task = mThreadData[id].WorkQueue.Pop()) {
                        mPendingTasks.fetch_sub(1,std::memory_order_release);
                                    task();


                    }
                    ///hereafter: stealing from other queues implementation below
                    ...
                }

}

Would this suffice? I know that by doing like that each thread might take a spin through a while loop with nothing to do, but maybe, just maybe, it acidentally steals a job from another thread.

Martin
  • 33
  • 5
  • I suggest: `for(;;) { mPoolSemaphore.aquire(); if(m_Done) { break; } /*...*/ }` – this way you can on terminating 1. clear the queue 2. release as many times as you have threads available and all the threads are going to exit gracefully... – Aconcagua May 19 '23 at 06:56
  • *'steals jobs from another thread*' I don't think this (each thread its own queue) has advantages over a global queue, just makes the entire design more complicated. Additionally the order of in which jobs get started gets (partially?) indeterminate, whereas with a global queue always the head of the queue is started next. Of course we'd want to be able to take out jobs as quickly as possible to minimise the time the global queue mutex is locked, so we need a data structure that supports that, like `std::list`, maybe (probably?) even better `std::forward_list`, but as well `std::deque`. – Aconcagua May 19 '23 at 07:01
  • If task data is large it might be fair choice to store only pointers to in the global queue to avoid copying the data on putting it into/getting it out of the queue (with the mutex locked in the meanwhile, of course). – Aconcagua May 19 '23 at 07:04
  • This pointer approach brings another interesting opportunity: Have your thread queue entirely *generic* (-> optimal re-usability): Provide your thread-data with nothing else than a pure virtual function (`execute`/`run` or whatever appears suitable to you. Then each worker thread would just grab an object from the queue and call this function and possibly delete the object. This way you're totally free to add *any* kind of task, not matter whatever it might do. – Aconcagua May 19 '23 at 07:24
  • About object deletion: Disallows adding non-dynamically allocated objects, not doing so requires the task objects to delete themselves as very last thing they do (if dynamically allocated). The latter has a smell to me, I'd rather go with the first. You might even provide a flag then an application can set to tell the pool to skip object deletion. – Aconcagua May 19 '23 at 07:31
  • You can even further generalise the approach by letting specify the data structure for the queue explicitly (either via polymorphism or as a template): The most simple variant would use a simple `std::forward_list`, but you might want to be able to prioritise the tasks, then you might use a `std::priority_queue` instead, or come up with a custom implementation if none of the latter really fits your needs (maybe prioritise, but auto-increase priority the longer the task waits already in the queue...). – Aconcagua May 19 '23 at 07:36
  • @Aconcagua The multiqueue approach handles contention way better in terms of a high amount of tasks and perhaps threads. The threadsafe queue is based on a deque. And in terms of syncronization, there is the possibility of submitting async tasks and waitable tasks, both single and batched tasks. Therefore the main thread can schedule what to wait for and what not to wait for. But that is of course not part of the example :-) – Martin May 19 '23 at 11:45
  • In terms of data size you can just pass in std::ref as parameters and make the functions passed into the queue an appropriate size – Martin May 19 '23 at 11:47
  • Well, the problem comes on stealing the tasks. Tasks getting to idle then need to check at all other tasks if they *can* steal, so you'll be going to iterate. How to determine where to steal from? Probably from the one with longest queue already. Then on assigning tasks you'll probably select the one with least tasks in its queue. If you don't do it that way you risk tasks stealing from other ones that then remain empty and will steal themselves next time while the dispatcher will assign to tasks already full. Then any advantage gone anyway (if there's one at all). – Aconcagua May 19 '23 at 14:39
  • A global queue is way simpler in implementation and as long as you add and remove elements *quickly* – and *that* is why I introduced the pointer (or reference wrapper, if you prefer, but that doesn't change anything in respect to *performance*) then, assuming tasks run some reasonable amount of time, chances that several threads access the queue at the same time are pretty small. – Aconcagua May 19 '23 at 14:41
  • Side note: You likely don't want to have too many threads anyway; consider the overhead of context switches(!) – unless maybe you are on linux and want use the multiple threads to get higher priority for the *process* containing all of these...). In the end my conclusion is that you likely won't gain much performance, if at all, for the price of a far more complex implementation... – Aconcagua May 19 '23 at 14:41
  • About the waiting or non-waiting tasks: Then consider my polymorphic apprach; new threads wouldn't accept a function any more, but all of would use exactly the same one (member function of the thread pool with `this` as argument). Differentiation occurs in the derived classes of the generic `Task` class – where you'd implement, too, if to wait or not. Might provide some reference implementation to illustrate, but currently quite busy, so you'll need to wait a little. – Aconcagua May 19 '23 at 14:46
  • [This](https://godbolt.org/z/YTf35defE) is how I would do it. Originally tried with templates (advantage: no pointer indirection, no virtual function calls), but different compilers use different template parameters, making compilation fail on clang. Might try switching back to another time (we might wrap the actual standard container into a our own class as with the default queue in the link), but not for now – or try *you* ;) – Aconcagua May 19 '23 at 16:05