0

I've implemented thread pooling following the answer of Kerrek SB in this question.

I've implemented MPMC queue for the functions and vector threads for the threads.

Everything worked perfectly, except that I don't know how to terminate the program, in the end if I just do thread.join since the thread is still waiting for more tasks to do, it will not join and the main thread will not continue.

Any idea how to end the program correctly?

For completeness, this is my code:

function_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>

class Function_pool
    {

private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;

public: 
    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    std::function<void()> pop();
};

function_pool.cpp

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition()
{
}

Function_pool::~Function_pool()
{
} 

void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to 
get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}

std::function<void()> Function_pool::pop()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_data_condition.wait(lock, [this]() {return !m_function_queue.empty(); 
});
    auto func = m_function_queue.front();
    m_function_queue.pop();
    return func;
    // Lock will be released
}

main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

void example_function()
{
    std::cout << "bla" << std::endl;
}

void infinite_loop_func()
{
    while (true)
    {
        std::function<void()> func = func_pool.pop();
        func();
    }
}

int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(infinite_loop_func));
    }

    //here we should send our functions
    func_pool.push(example_function);

    for (int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
    int i;
    std::cin >> i;
}
pio
  • 500
  • 5
  • 12

3 Answers3

2

You can always use a specific exception type to signal to infinite_loop_func that it should return...

class quit_worker_exception: public std::exception {};

Then change infinite_loop_func to...

void infinite_loop_func ()
{
  while (true) {
    std::function<void()> func = func_pool.pop();
    try {
      func();
    }
    catch (quit_worker_exception &ex) {
      return;
    }
  }
}

With the above changes you could then use (in main)...

/*
 * Enqueue `thread_pool.size()' function objects whose sole job is
 * to throw an instance of `quit_worker_exception' when invoked.
 */
for (int i = 0; i < thread_pool.size(); i++)
  func_pool.push([](){ throw quit_worker_exception(); });

/*
 * Now just wait for each worker to terminate having received its
 * quit_worker_exception.
 */
for (int i = 0; i < thread_pool.size(); i++)
  thread_pool.at(i).join();

Each instance of infinite_loop_func will dequeue one function object which, when called, throws a quit_worker_exception causing it to return.

G.M.
  • 12,232
  • 2
  • 15
  • 18
  • thx a lot! Just more thing, currently the implementation accept only pool of functions that take no arguments. is there any way to define the whole mechanism to work with general functions that can have some kind of argument? for example, void func_a(int a) and void func_b(char* a) – pio Jul 17 '18 at 13:09
  • 2
    Technically speaking, yes, the function type used could accept arguments. But how would you use that? Where would the parameters come from and how/where would you pass them to the function? The nice thing about `std::function` is that it can be used to fully encapsulate the entire task by using something like a `lambda` with bound variables. – G.M. Jul 17 '18 at 13:24
2

Your problem is located in infinite_loop_func, which is an infinite loop and by result doesn't terminate. I've read the previous answer which suggests throwing an exception, however, I don't like it since exceptions should not be used for the regular control flow.

The best way to solve this is to explicitly deal with the stop condition. For example:

std::atomic<bool> acceptsFunctions;

Adding this to the function pool allows you to clearly have state and to assert that no new functions being added when you destruct.

std::optional<std::function<void()>> Function_pool::pop()

Returning an empty optional (or function in C++14 and before), allows you to deal with an empty queue. You have to, as condition_variable can do spurious wakeups.

With this, m_data_condition.notify_all() can be used to wake all threads.

Finally we have to fix the infinite loop as it doesn't cover overcommitment and at the same time allows you to execute all functions still in the queue:

while (func_pool.acceptsFunctions || func_pool.containsFunctions())
{
    auto f = func_pool.pop();
    If (!f)
    {
           func_pool.m_data_condition.wait_for(1s);
            continue;
     }

    auto &function = *f;
    function ();
}

I'll leave it up to you to implement containsFunctions() and clean up the code (infinite_loop_func as member function?) Note that with a counter, you could even deal with background task being spawned.

JVApen
  • 11,008
  • 5
  • 31
  • 67
0
Follwoing [JVApen](https://stackoverflow.com/posts/51382714/revisions) suggestion, I copy my code in case anyone will want a working code:

function_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>

class Function_pool
{

private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;
    std::atomic<bool> m_accept_functions;

public:

    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    void done();
    void infinite_loop_func();
};

function_pool.cpp

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}

Function_pool::~Function_pool()
{
}

void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}

void Function_pool::done()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_accept_functions = false;
    lock.unlock();
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    m_data_condition.notify_all();
    //notify all waiting threads.
}

void Function_pool::infinite_loop_func()
{
    std::function<void()> func;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(m_lock);
            m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
            if (!m_accept_functions && m_function_queue.empty())
            {
                //lock will be release automatically.
                //finish the thread loop and let it join in the main thread.
                return;
            }
            func = m_function_queue.front();
            m_function_queue.pop();
            //release the lock
        }
        func();
    }
}

main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

class quit_worker_exception : public std::exception {};

void example_function()
{
    std::cout << "bla" << std::endl;
}

int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
    }

    //here we should send our functions
    for (int i = 0; i < 50; i++)
    {
        func_pool.push(example_function);
    }
    func_pool.done();
    for (unsigned int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
}
pio
  • 500
  • 5
  • 12