2
boost::asio::io_service ioService;
boost::thread_group threadpool;
boost::barrier barrier(5);
boost::asio::io_service::work work(ioService);
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));  //Thread 1
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));  //Thread 2
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));  //Thread 3 threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));  //Thread 4

while (true)
{
  { 
     boost::lock_guard<boost::recursive_mutex> lock(mutex_);
     map::iterator it = m_map.begin();   
     while (it != m_map.end())
     {
         ioService.post(boost::bind(&ProcessFun, this, it));
         ++it;    
     }

    ioService.run(); <-- main thread is stuck here..  
   }

}

I want to have the ability to know that all of the tasks that were assigned to the thread pool have been done, only after to assign tasks again to the thread pool.

As long as the threads are processing the tasks I don't want to release the lock. Is there any way I can make sure all of the assigned tasks are done? And only then to proceed?

USer22999299
  • 5,284
  • 9
  • 46
  • 78

2 Answers2

2

The simplest way is to just call ioService.run() According to the boost asio documentation:

The io_service object's run() function will not exit while work is underway. It does exit when there is no unfinished work remaining.

By the way it is difficult to determine this without seeing much more of your program, but it appears that you are attempting to defeat the primary purpose of asio. You are serializing batches of tasks. If somehow it is important that all tasks in batch#1 be completely processed before any task in batch#2 begins, then this may make sense, but it is an odd usage.

Also be careful, if any of the handlers for batch#1 tasks try to add new tasks, they can deadlock attempting to acquire the lock on the mutex.

Dale Wilson
  • 9,166
  • 3
  • 34
  • 52
  • Thanks! yeah I'm took it into consideration, as you said, i want to finish the first batch before starting the new one. otherwise there is not use to that :) – USer22999299 Apr 03 '15 at 04:53
  • @USer22999299 Seems legit, you won't exit from `io_service::run()` without destroying the `io_service::work` instance and maybe calling `io_service::stop()`. – Chnossos Apr 05 '15 at 14:36
  • @Chnossos so it seems like this is not a solution which I can use.. I'm looking for a way that the main thread will wait until all of the threads are done with the work. Only then to add new tasks – USer22999299 Apr 06 '15 at 05:01
  • 1
    Well the solution is simple then, you need to *not* use a `io_service::work`, that way `io_service::run` will naturally exit when all the tasks are done executing. Your threads will need to wait on a custom condition too to prevent them from exiting too early. – Chnossos Apr 06 '15 at 09:02
  • @Chnossos, Good catch. I missed the io_service::work. USnnnnn, the purpose of io_service::work is to prevent the behavior that you want. – Dale Wilson Apr 06 '15 at 15:39
0

So, my final solution was to create a small semaphore on my own that has a mutex and condition veritable inside which i found here :

C++0x has no semaphores? How to synchronize threads?

I pass this semaphore as pointer to the threads, and reset it each iteration. I had to modify a bit the semaphore code to enable reset functionality and because my threads sometime finish the work before the main thread fall a sleep i had to modify the condition inside abit

class semaphore
{
private:
    boost::mutex mutex_;
    boost::condition_variable condition_;
    unsigned long count_;

public:
    semaphore()
        : count_()
    {}

    void reset(int x)
    {
      count = x;
    }
    void notify()
    {
        boost::mutex::scoped_lock lock(mutex_);
        ++count_;
        if(count_ < 1)
            condition_.notify_one();
    }

    void wait()
    {
        boost::mutex::scoped_lock lock(mutex_);
        while(count_ > 1)
            condition_.wait(lock);

    }
};


....
....
semaphore*  m_semaphore = new semaphore();

while (true)
{
  { 
     boost::lock_guard<boost::recursive_mutex> lock(mutex_);
     map::iterator it = m_map.begin();
     if(it ! = m_map.end())
        m_semaphore->reset(m_map.size());  
     while (it != m_map.end())
     {
         ioService.post(boost::bind(&ProcessFun, this, it, &m_semaphore));
         ++it;    
     }

      m_semaphore.wait();
   }

}
Community
  • 1
  • 1
USer22999299
  • 5,284
  • 9
  • 46
  • 78