3

i want to apply boost group_thread in my program with a maximal number of Threads. For example

int maxNumberOfThreads
boost::thread_group group;
 for (int i = 0; i < N; ++i)
      //create new if group.size() is smaller then maximal number of threads
      group.create_thread(Worker);
 group.join_all();

Someone has an idea how i can realize this ?

Because it will be very inefficient when i start N numbers of thread.

Thank you for your help

Hunk
  • 479
  • 11
  • 33

2 Answers2

4

What you seem to want is a thread pool.

You can use boost::thread::hardware_concurrency() to determine the number of (logical) cores available on your particular system.

Here's one I rolled for an answer last week:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

boost::atomic_size_t counter(0ul);

class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;

      typedef function<void()> job_t;
      std::deque<job_t> _queue;

      thread_group pool;

      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (optional<job_t> job = q.dequeue())
              (*job)();
      }

  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }

      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(job);

          cv.notify_one();
      }

      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;

          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

          if (_queue.empty())
              return none;

          job_t job = _queue.front();
          _queue.pop_front();

          return job;
      }

      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }

          pool.join_all();
      }
};

A typical way to use that is also in that answer:

static const size_t bignumber = 1 << 20;

class myClass 
{
    thread_pool pool; // uses 1 thread per core

  public:
    void launch_jobs()
    {
        std::cout << "enqueuing jobs... " << std::flush;
        for(size_t i=0; i<bignumber; ++i)
        {
            for(int j=0; j<2; ++j) {
                pool.enqueue(bind(&myClass::myFunction, this, j, i));
            }     
        }
        std::cout << "done\n";
    }

  private:
    void myFunction(int i, int j)
    {
        boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
        counter += 1;
    }
};

int main()
{
    myClass instance;
    instance.launch_jobs();

    size_t last = 0;
    while (counter < (2*bignumber))
    {
        boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
        if ((counter >> 4u) > last)
        {
            std::cout << "Progress: " << counter << "/" << (bignumber*2) << "\n";
            last = counter >> 4u;
        }
    }
}

For bonus, at that question, in the comments to another answer, I also posted an equivalent solution based on a lock-free job queue implementation:

Community
  • 1
  • 1
sehe
  • 374,641
  • 47
  • 450
  • 633
  • thank your for this great answer. I read this question with thread resource error, but i didn't realize that this thread pool is also the solution for my problem. At the moment i am trying to run the thread_pool class. But i have a few problems with std::move and auto. Is that only for c++11 ? – Hunk Mar 27 '14 at 14:28
  • Yes. I've updated the code in this answer for c++03 now. The c++11 version is potentially slightly more efficient. Cheers – sehe Mar 27 '14 at 14:36
  • thank you for your help. It should be possible to add the function waitTillJobsAreDone inside the thread_pool class or? Inside of this function i have only to call pool.join_all(); this should block all jobs till their are done or? – Hunk Mar 27 '14 at 14:50
  • ok i think i misunderstood this architecture. This thread pool is running all the time and only finished if the destructor is called or? Or it is possible to shut him down? For example i have 10 jobs to do and i want to wait for this jobs for the results. After that i want to shutdown my thread pool – Hunk Mar 27 '14 at 15:19
  • You have a strange way to phrase questions :) Assuming you want to know whether you can do these things: you can do what you _need_ but you _cannot_ do it in the way you suggest. **1.** you can wait for the queue to be `empty()` **2.** the destructor already does exactly this. But correctly :) See **[`is_idle()` and `wait_idle()` implementations here](http://coliru.stacked-crooked.com/a/d8385e8fd5df1b69) (fixed)** - be sure to read (and think about!) the comments. Designing thread-aware interfaces is not so simple :) – sehe Mar 27 '14 at 15:35
  • @Hunk (so to wait for jobs to complete + shutdown the thread pool, just let it go out of scope. This already does exactly what you need. In fact, how did you think the program exited? It would have kept on running without this feature). – sehe Mar 27 '14 at 15:37
  • I know my English grammar is really bad at the moment. Sorry for that. I also tried to check the queue for empty. But in my opinion this is no argument that all jobs are done, because we can have one job which takes a long time or? And this one can still run. – Hunk Mar 27 '14 at 15:58
  • My problem is that my Job has an result. I bind it with pool.enqueue(boost::bind(&Class::Worker, this, i,j,boost::ref(result))); And i want to wait till all the workers are done and i can use the results. Because that it is important to know when all jobs are finsihed – Hunk Mar 27 '14 at 16:01
  • In that case, look at `boost::packaged_task`, which is precisely for this type of scenario. This is also why I made the jobs `void(void)`, since you can always pass a packaged_task to bind your context and retrieve the return value/exception. – sehe Mar 27 '14 at 16:02
  • Good point about the difference between `_queue.empty()` and `thread_pool` being /idle/. I didn't think of that when I named my "idle" functions. So, the destructor is exactly what you want, because it actually waits for all the threads to finish. – sehe Mar 27 '14 at 16:03
  • Big thank you for your help. i copied the destructor inside my wait function and it is working. I dont really understand why i need lock_guard lk(mx); notify_all(); But i have to read a little bit more about mutex and this notifications. I also take a look to packed_task :) Again thank you – Hunk Mar 27 '14 at 16:19
  • The `notify_all` wakes up any workers that were sleeping, so they can know to exit gracefully. – sehe Mar 27 '14 at 16:44
0

This is my (imperfect) implementation :

/**
 * \author Christophe Dumeunier
 * \brief  Extension of boost::thread_group managing a maximum number of threads running in parallel
 */
class thread_group_max : public boost::thread_group
{
    public:
        /**
         * \brief  Instanciate a group for threads
         * \param  max_running_threads  Maximum number of threads running in parallel, if 0 use the number of cores
         * \param    max_sleeping_time  Maximum sleeping time (seconds) between two checks for finished threads (must be > sleeping_time_start)
         * \param   sleeping_time_grow  Coefficient increasing sleeping time while waiting for finished threads (must be > 1)
         * \param  sleeping_time_start  Initial sleeping time (must be > 0)
         */
        explicit                   thread_group_max(std::size_t max_running_threads = 0, float max_sleeping_time = 1.0f,
                                                    float sleeping_time_grow = 1.1f, float sleeping_time_start = 0.001f);
        /**
         * \brief  Destroy the group
         * \note   Doesn't join the unterminated threads
         */
                                   ~thread_group_max();

        /** \brief Wait for an available slot and then create a new thread and launch it */
        template<typename F>
        boost::thread*             create_thread(F f);

    private:
        std::size_t                maxRunningThreads;  //!< Maximum number of running threads
        float                      maxSleepingTime;    //!< Maximum sleeping time between two checks for finished threads
        float                      sleepingTimeStart;  //!< Initial sleeping time
        float                      sleepingTimeGrow;   //!< Coefficient increasing sleeping time while waiting for finished threads
        std::set<boost::thread*>   runningThreads;     //!< Pointers to running or finished-but-not-removed-yet threads
};

thread_group_max::thread_group_max(std::size_t max_running_threads, float max_sleeping_time, float sleeping_time_grow, float sleeping_time_start) :
    boost::thread_group(),
    maxRunningThreads(max_running_threads == 0 ? std::max(boost::thread::hardware_concurrency(), 1u) : max_running_threads),
    maxSleepingTime(max_sleeping_time),
    sleepingTimeStart(sleeping_time_start),
    sleepingTimeGrow(sleeping_time_grow),
    runningThreads()
{
    assert(this->maxRunningThreads > 0);
    assert(this->maxSleepingTime >= this->sleepingTimeStart);
    assert(this->sleepingTimeStart > 0.0f);
    assert(this->sleepingTimeGrow > 1.0f);
}

thread_group_max::~thread_group_max()
{}

template<typename F>
boost::thread* thread_group_max::create_thread(F f)
{
    // First, try to clean already finished threads
    if(this->runningThreads.size() >= this->maxRunningThreads)
    {
        for(std::set<boost::thread*>::iterator it = this->runningThreads.begin(); it != this->runningThreads.end();)
        {
            const std::set<boost::thread*>::iterator jt = it++;
            if((*jt)->timed_join(boost::posix_time::milliseconds(0))) /// @todo timed_join is deprecated
                this->runningThreads.erase(jt);
        }
    }

    // If no finished thread found, wait for it
    if(this->runningThreads.size() >= this->maxRunningThreads)
    {
        float sleeping_time = this->sleepingTimeStart;
        do
        {
            boost::this_thread::sleep(boost::posix_time::milliseconds((long int)(1000.0f * sleeping_time)));
            for(std::set<boost::thread*>::iterator it = this->runningThreads.begin(); it != this->runningThreads.end();)
            {
                const std::set<boost::thread*>::iterator jt = it++;
                if((*jt)->timed_join(boost::posix_time::milliseconds(0))) /// @todo timed_join is deprecated
                    this->runningThreads.erase(jt);
            }
            if(sleeping_time < this->maxSleepingTime)
            {
                sleeping_time *= this->sleepingTimeGrow;
                if(sleeping_time > this->maxSleepingTime)
                    sleeping_time = this->maxSleepingTime;
            }
        } while(this->runningThreads.size() >= this->maxRunningThreads);
    }

    // Now, at least 1 slot is available, use it
    return *this->runningThreads.insert(this->boost::thread_group::create_thread(f)).first;
}

Example of use:

thread_group_max group(num_threads);
for(std::size_t i = 0; i < jobs.size(); ++i)
  group.create_thread(boost::bind(&my_run_job_function, boost::ref(job[i])));
group.join_all();
Caduchon
  • 4,574
  • 4
  • 26
  • 67