3

I have implemented the solution from How to create a thread pool using boost in C++? but am having a problem with the io_service::stop() function stopping further processing of my threads.

In my case, I have 3 threads in my pool and am trying to run around 11000 records through it. Each of the records is independent of the others, so I'm just looking to speed up the processing by creating parallel runs of each record.

void processRecord (unsigned int i, unsigned int numRecords)
{
    cout << i << "/" << numRecords << endl;

    // do Processing...
}

#define MAX_THREADS 3
unsigned int numRecords=11000

boost::asio::io_service ioService;
boost::thread_group threadPool;

boost::asio::io_service::work work (ioService);

for (unsigned int i=0 ; i<MAX_THREADS ; ++i)
{
    threadPool.create_thread (boost::bind (&boost::asio::io_service::run, &ioService));
}

for (unsigned int i=0 ; i<numRecords ; ++i)
{
    ioService.post (boost::bind (processRecord, i, numRecords);
}

// ioService.stop ();          // Was causing ioService to stop
work.reset();                  // Wait for all work to be finished.
threadPool.join_all ();

processAllRecords ();

The problem I'm seeing is that after the loop where the call to ioService.post() is pushing processes into the pool is finished, it hits the ioService.stop() call and stops all further processing. This usually happens after only about 400 of the records have actually been processed.

So, only ~400 of the ~11000 records are being processed.

I'm new to using threads in C++, so I'm not sure what I'm missing or how to correct this problem. Any help would be greatly appreciated.

Edit: I modified the code above to reflect the change that I made to make it work. Essentially, the ioService.stop() call was causing all further processing to stop. I replaced that with work.wait() so that it would wait until all work was finished.

Edit2: I used the wrong function in work in my previous edit. It should be work.reset().

Community
  • 1
  • 1
Tim Anderson
  • 93
  • 1
  • 6
  • 2
    Don't call stop until you're done? – sehe Feb 07 '14 at 22:08
  • 1
    I was just about to post that I found the solution here: http://www.tonicebrian.com/2012/05/23/thread-pool-in-c/. Instead of calling ioService.stop() I should call work.reset(). This will wait for all of the "work" to be finished, then I can do the join_all(). That fixed that problem, now I have another to track down that is unrelated. – Tim Anderson Feb 07 '14 at 23:10
  • 1
    It would be most useful if you posted your answer for others to find it (and upvote!) – sehe Feb 07 '14 at 23:44
  • 1
    I have modified the above code to match what I have working, now. – Tim Anderson Feb 10 '14 at 15:39

1 Answers1

0

Using your code, my way of using boost::asio for a thread group, encloses the work in brackets and uses a scoped_ptr. just an idea.

void processRecord (unsigned int i, unsigned int numRecords)
{
    cout << i << "/" << numRecords << endl;

    // do Processing...
}

#define MAX_THREADS 3
unsigned int numRecords=11000

boost::asio::io_service ioService;
boost::thread_group threadPool;

// by using a scoped pointer for the io_service::work
// and enclosing the threading in brackets
// this should run until all the jobs have finished
// and you don't need to call work.reset()  

// added brackets around threading
{
    // made work a boost::scoped_ptr
    boost::scoped_ptr< boost::asio::io_service::work > 
          work ( new boost::asio::io_service(ioService) );

    for (unsigned int i=0 ; i<MAX_THREADS ; ++i)
    {
        threadPool.create_thread (
           boost::bind (&boost::asio::io_service::run, &ioService));
    }

    for (unsigned int i=0 ; i<numRecords ; ++i)
    {
        ioService.post (boost::bind (processRecord, i, numRecords);
    }
}
// now just have to join
threadPool.join_all ();

processAllRecords ();
DannyK
  • 1,342
  • 16
  • 23