12

I'm using boost::asio::io_service as a basic thread pool. Some threads get added to io_service, the main thread starts posting handlers, the worker threads start running the handlers, and everything finishes. So far, so good; I get a nice speedup over single-threaded code.

However, the main thread has millions of things to post. And it just keeps on posting them, much faster than the worker threads can handle them. I don't hit RAM limits, but it's still kind of silly to be enqueuing so many things. What I'd like to do is have a fixed-size for the handler queue, and have post() block if the queue is full.

I don't see any options for this in the Boost ASIO docs. Is this possible?

Sam Miller
  • 23,808
  • 4
  • 67
  • 87
Jon Stewart
  • 393
  • 1
  • 4
  • 11

4 Answers4

2

I'm using the semaphore to fix the handlers queue size. The following code illustrate this solution:

void Schedule(boost::function<void()> function)
{
    semaphore.wait();
    io_service.post(boost::bind(&TaskWrapper, function));
}

void TaskWrapper(boost::function<void()> &function)
{
    function();
    semaphore.post();
}
2

You can wrap your lambda in another lambda which would take care of counting the "in-progress" tasks, and then wait before posting if there are too many in-progress tasks.

Example:

#include <atomic>
#include <chrono>
#include <future>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <boost/asio.hpp>

class ThreadPool {
  using asio_worker = std::unique_ptr<boost::asio::io_service::work>;
  boost::asio::io_service service;
  asio_worker service_worker;
  std::vector<std::thread> grp;
  std::atomic<int> inProgress = 0;
  std::mutex mtx;
  std::condition_variable busy;
public:
  ThreadPool(int threads) : service(), service_worker(new asio_worker::element_type(service)) {
    for (int i = 0; i < threads; ++i) {
      grp.emplace_back([this] { service.run(); });
    }
  }

  template<typename F>
  void enqueue(F && f) {
    std::unique_lock<std::mutex> lock(mtx);
    // limit queue depth = number of threads
    while (inProgress >= grp.size()) {
      busy.wait(lock);
    }
    inProgress++;
    service.post([this, f = std::forward<F>(f)]{
      try {
        f();
      }
      catch (...) {
        inProgress--;
        busy.notify_one();
        throw;
      }
      inProgress--;
      busy.notify_one();
    });
  }

  ~ThreadPool() {
    service_worker.reset();
    for (auto& t : grp)
      if (t.joinable())
        t.join();
    service.stop();
  }
};

int main() {
  std::unique_ptr<ThreadPool> pool(new ThreadPool(4));
  for (int i = 1; i <= 20; ++i) {
    pool->enqueue([i] {
      std::string s("Hello from task ");
      s += std::to_string(i) + "\n";
      std::cout << s;
      std::this_thread::sleep_for(std::chrono::seconds(1));
    });
  }
  std::cout << "All tasks queued.\n";
  pool.reset(); // wait for all tasks to complete
  std::cout << "Done.\n";
}

Output:

Hello from task 3
Hello from task 4
Hello from task 2
Hello from task 1
Hello from task 5
Hello from task 7
Hello from task 6
Hello from task 8
Hello from task 9
Hello from task 10
Hello from task 11
Hello from task 12
Hello from task 13
Hello from task 14
Hello from task 15
Hello from task 16
Hello from task 17
Hello from task 18
All tasks queued.
Hello from task 19
Hello from task 20
Done.
rustyx
  • 80,671
  • 25
  • 200
  • 267
0

you could use the strand object to put the events and put a delay in your main ? Is your program dropping out after all the work is posted? If so you can use the work object which will give you more control over when your io_service stops.

you could always main check the state of the threads and have it wait untill one becomes free or something like that.

//links

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service__strand.html

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service.html

//example from the second link
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);

hope this helps.

gda2004
  • 718
  • 13
  • 32
  • The problem is not that the `io_service` stops before completing the work---we know about deleting the `work` object to make the `io_service` stop gracefully. The problem is that the `io_service` lets too many tasks accumulate. We'd like to limit the number of unassigned tasks in a way that doesn't involve polling on the part of the thread creating the tasks, hence our question about whether `poll()` can be made to block. – uckelman Jul 28 '12 at 21:34
0

Maybe try lowering the priority of the main thread so that once the worker threads get busy they starve the main thread and the system self limits.

nate
  • 1,771
  • 12
  • 17