-1

Below is some code showing a simple and short implementation of a thread pool.

The code is inspired by this post.

I compile it with clang++ -std=c++11 threadpool.cpp -o threadpool -lpthread

When executed I got following:

./threadpool 
terminate called without an active exception

As I see, the problem is getting out of function pool_t::pop() and its infinite loop.

My question is, how to get elegantly out of the loop?

the forgotten code - my apologies -

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <functional>
#include <condition_variable>

struct tasks_t
{
  std::queue<std::function<void()>> queue;

  std::mutex mutex;
};

struct threads_t
{
  std::vector<std::thread> vector;

  std::condition_variable condition;
};

struct pool_t
{
  tasks_t tasks;

  threads_t threads;

  void pop()
  {
    while(true)
    {
      std::function<void()> task;
      {
        std::unique_lock<std::mutex> lock(tasks.mutex);

        threads.condition.wait(lock,[this]{return !tasks.queue.empty();});

        task = tasks.queue.front();

        tasks.queue.pop();
      }
      task();
    }
  }

  void push(std::function<void()> function)
  {
    {
      std::unique_lock<std::mutex> lock(tasks.mutex);

      tasks.queue.push(function);
    }
    threads.condition.notify_one();
  }

  void start()
  {
    for (int i=0,j=std::thread::hardware_concurrency(); i!=j; ++i)
    {
      threads.vector.push_back(std::thread(&pool_t::pop,this));
    }
  }
};

#include <chrono>
#include <iostream>

std::function<void()> t0 = []
{
  std::cout << "t0" << std::endl;
  std::this_thread::sleep_for(std::chrono::seconds(1));
  return;
};

std::function<void()> t1 = []
{
  std::cout << "t1" << std::endl;
  std::this_thread::sleep_for(std::chrono::seconds(2));
  return;
};

int main()
{
  pool_t pool;

  pool.start();

  pool.push(t0);

  pool.push(t1);
}
Community
  • 1
  • 1
user1587451
  • 978
  • 3
  • 15
  • 30

1 Answers1

0

In a case such as this the easiest way is often to enqueue a task that simply throws a specific type of exception than can be caught and acted upon...

struct pool_t {
  class quit_exception {};
  tasks_t tasks;
  threads_t threads;

  void pop ()
  {
    while (true) {
      std::function<void()> task;
      {
        std::unique_lock<std::mutex> lock(tasks.mutex);
        threads.condition.wait(lock, [this]{ return !tasks.queue.empty(); });
        task = tasks.queue.front();
        tasks.queue.pop();
      }
      try {
        task();
      }
      catch (quit_exception &ex) {
        return;
      }
    }
  }

When you need to break out of the loop just do...

pool.push([](){ throw pool::quit_exception(); });

Depending on the precise usage you may want to make quit_exception a private type so that only pool_t itself can exit the loop in this fashion -- in its destructor for example.

G.M.
  • 12,232
  • 2
  • 15
  • 18