1

I'm looking for a way to wait for a number of jobs to finish, and then execute another completely different number of jobs. With threads, of course. A brief explanation: I created two worker threads, both executing run on io_service. The code below is taken from here.

For the sake of simplicity, i had created two types of jobs, CalculateFib i CalculateFib2. I want the CalculateFib2 jobs to start after and only after the CalculateFib jobs finish. I tried to use condition variable as explained here, but the program hangs if CalculateFib2 jobs are more than one. What am I doing wrong?

thx, dodol

#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;
boost::mutex mx;
boost::condition_variable cv;

void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service)
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Start" << std::endl;
    global_stream_lock.unlock();

    io_service->run();

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Finish" << std::endl;
    global_stream_lock.unlock();
}

size_t fib( size_t n )
{
    if ( n <= 1 )
    {
        return n;
    }
    boost::this_thread::sleep( 
        boost::posix_time::milliseconds( 1000 )
        );
    return fib( n - 1 ) + fib( n - 2);
}

void CalculateFib( size_t n )
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Now calculating fib( " << n << " ) " << std::endl;
    global_stream_lock.unlock();

    size_t f = fib( n );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] fib( " << n << " ) = " << f << std::endl;
    global_stream_lock.unlock();

    boost::lock_guard<boost::mutex> lk(mx);
    cv.notify_all();
}

void CalculateFib2( size_t n )
{
    boost::unique_lock<boost::mutex> lk(mx);
    cv.wait(lk);

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Now calculating fib2( " << n << " ) " << std::endl;
    global_stream_lock.unlock();

    size_t f = fib( n );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] fib2( " << n << " ) = " << f << std::endl;
    global_stream_lock.unlock();
}
int main( int argc, char * argv[] )
{
    boost::shared_ptr< boost::asio::io_service > io_service(
        new boost::asio::io_service
        );
    boost::shared_ptr< boost::asio::io_service::work > work(
        new boost::asio::io_service::work( *io_service )
        );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] The program will exit when all work has finished."
        << std::endl;
    global_stream_lock.unlock();

    boost::thread_group worker_threads;
    for( int x = 0; x < 2; ++x )
    {
        worker_threads.create_thread( 
            boost::bind( &WorkerThread, io_service )
            );
    }
    io_service->post( boost::bind( CalculateFib, 5 ) );
    io_service->post( boost::bind( CalculateFib, 4 ) );
    io_service->post( boost::bind( CalculateFib, 3 ) );

    io_service->post( boost::bind( CalculateFib2, 1 ) );
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    work.reset();
    worker_threads.join_all();

    return 0;
}
Community
  • 1
  • 1
dodol
  • 1,073
  • 2
  • 16
  • 33

1 Answers1

2

Inside CalculateFib2 the first thing you do is wait for the condition (cv). This condition only gets signaled at the end of CalculateFib. So, it stands to reason that execution never continues, unless the condition is triggered (by posting CalculateFib) job.

Indeed, adding any other line like so:

io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );

io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );

io_service->post( boost::bind( CalculateFib, 5 ) );   // <-- ADDED

makes execution run to completion.

In an effort to shed more light: if you isolate a Fib2 batch (in time) like

io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );

boost::this_thread::sleep(boost::posix_time::seconds( 10 ));
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );

all the Fib2 jobs will always block, regardless of the number of threads, because the Fib jobs had all exited before posting them. A simple

io_service->post( boost::bind( CalculateFib, 1 ) );

will unlock all the waiters (i.e. only as many as there are waiting threads, which is the number of available threads -1, because the Fib() jobs occupies a thread as well. Now with <7 threads this would deadlock, because there is no thread available to even start the Fib() job on (all threads are blocked waiting in Fib2)


To be honest I don't get what you are trying to achieve in terms of scheduling. I suspect you should be monitoring job queues and explicitely posting jobs ('tasks') only when you reached the required amount of items. That way you can KISS and get a very flexible interface to your job scheduling.

In general, with a thread group (pooling) you want to avoid blocking the threads for indefinite amounts of time. This has the potential to deadlock your work scheduling as well as perform poorly otherwise.

sehe
  • 374,641
  • 47
  • 450
  • 633
  • Yes, I'd like to post jobs only when some other jobs are done, since they depend on results created by the first group of jobs. I just don't know how should do that, that is, how can I know when the job group is finished?. btw, thx for the thorough explanation on above subject. – dodol Oct 17 '11 at 08:15
  • @dodol: the intent you describe could still mean many different things. However, I'd consider simply posting the follow-up job from within the 'dependency job' (so you'd give the job thread function as a parameter to the firrst job - this closely resembles _[continuation passing style](http://en.wikipedia.org/wiki/Continuation-passing_style)_) – sehe Oct 17 '11 at 08:28
  • I'd like to be able to post one group of jobs on multiple threads, and then when the jobs are over, another group of jobs, also on multiple threads. This cycle loops many times, so I thought the best thing to do is to create io_service and create threads at the beginning of the loop, so I don't create & destroy threads at every time the loop starts/ends. The trouble was that I just don't know when should I start the second group of jobs, since i just couldn't figure out when the first one is going to be done. any thoughts? – dodol Oct 17 '11 at 08:50
  • 1
    This calls sounds like a case for a sempahore, initialized on the number of jobs in the first groups. Then, when it reaches zero, start the second group. Not all OS-es supports 'snooping' for a semaphore counts, however. Also, my reference to CPS above doesn't mean you can't use a thread pool. – sehe Oct 17 '11 at 08:56