102

Being a total beginner to Boost.Asio, I am confused with io_service::run(). I would appreciate it if someone could explain to me when this method blocks/unblocks. The documentations states:

The run() function blocks until all work has finished and there are no more handlers to be dispatched, or until the io_service has been stopped.

Multiple threads may call the run() function to set up a pool of threads from which the io_service may execute handlers. All threads that are waiting in the pool are equivalent and the io_service may choose any one of them to invoke a handler.

A normal exit from the run() function implies that the io_service object is stopped (the stopped() function returns true). Subsequent calls to run(), run_one(), poll() or poll_one() will return immediately unless there is a prior call to reset().

What does the following statement mean?

[...] no more handlers to be dispatched [...]


While trying to understand the behavior of io_service::run(), I came across this example (example 3a). Within it, I observe that io_service->run() blocks and waits for work orders.

// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);

boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
   new boost::asio::io_service::work(*io_service));

// ...

boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
  worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
}

io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));

work.reset();
worker_threads.join_all();

However, in the following code that I was working on, the client connects using TCP/IP and the run method blocks until data is asynchronously received.

typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));

// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1", 
                           boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());

// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                      ClientReceiveEvent);
io_service->run();

// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);

Any explanation of run() that describes its behavior in the two examples below would be appreciated.

Tanner Sansbury
  • 51,153
  • 9
  • 112
  • 169
MistyD
  • 16,373
  • 40
  • 138
  • 240

2 Answers2

270

Foundation

Lets start with a simplified example and examine the relevant Boost.Asio pieces:

void handle_async_receive(...) { ... }
void print() { ... }

...  

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print);                             // 1
socket.connect(endpoint);                            // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print);                             // 4
io_service.run();                                    // 5

What Is A Handler?

A handler is nothing more than a callback. In the example code, there are 3 handlers:

  • The print handler (1).
  • The handle_async_receive handler (3).
  • The print handler (4).

Even though the same print() function is used twice, each use is considered to create its own uniquely identifiable handler. Handlers can come in many shapes and sizes, ranging from basic functions like the ones above to more complex constructs such as functors generated from boost::bind() and lambdas. Regardless of the complexity, the handler still remains nothing more than a callback.

What Is Work?

Work is some processing that Boost.Asio has been requested to do on behalf of the application code. Sometimes Boost.Asio may start some of the work as soon as it has been told about it, and other times it may wait to do the work at a later point in time. Once it has finished the work, Boost.Asio will inform the application by invoking the supplied handler.

Boost.Asio guarantees that handlers will only run within a thread that is currently calling run(), run_one(), poll(), or poll_one(). These are the threads that will do work and call handlers. Therefore, in above example, print() is not invoked when it is posted into the io_service (1). Instead, it is added to the io_service and will be invoked at a later point in time. In this case, it within io_service.run() (5).

What Are Asynchronous Operations?

An asynchronous operation creates work and Boost.Asio will invoke a handler to inform the application when the work has completed. Asynchronous operations are created by calling a function that has a name with the prefix async_. These functions are also known as initiating functions.

Asynchronous operations can be decomposed into three unique steps:

  • Initiating, or informing, the associated io_service that works needs to be done. The async_receive operation (3) informs the io_service that it will need to asynchronously read data from the socket, then async_receive returns immediately.
  • Doing the actual work. In this case, when socket receives data, bytes will be read and copied into buffer. The actual work will be done in either:
    • The initiating function (3), if Boost.Asio can determine that it will not block.
    • When the application explicitly run the io_service (5).
  • Invoking the handle_async_receive ReadHandler. Once again, handlers are only invoked within threads running the io_service. Thus, regardless of when the work is done (3 or 5), it is guaranteed that handle_async_receive() will only be invoked within io_service.run() (5).

The separation in time and space between these three steps is known as control flow inversion. It is one of the complexities that makes asynchronous programming difficult. However, there are techniques that can help mitigate this, such as by using coroutines.

What Does io_service.run() Do?

When a thread calls io_service.run(), work and handlers will be invoked from within this thread. In the above example, io_service.run() (5) will block until either:

  • It has invoked and returned from both print handlers, the receive operation completes with success or failure, and its handle_async_receive handler has been invoked and returned.
  • The io_service is explicitly stopped via io_service::stop().
  • An exception is thrown from within a handler.

One potential psuedo-ish flow could be described as the following:

create io_service
create socket
add print handler to io_service (1)
wait for socket to connect (2)
add an asynchronous read work request to the io_service (3)
add print handler to io_service (4)
run the io_service (5)
  is there work or handlers?
    yes, there is 1 work and 2 handlers
      does socket have data? no, do nothing
      run print handler (1)
  is there work or handlers?
    yes, there is 1 work and 1 handler
      does socket have data? no, do nothing
      run print handler (4)
  is there work or handlers?
    yes, there is 1 work
      does socket have data? no, continue waiting
  -- socket receives data --
      socket has data, read it into buffer
      add handle_async_receive handler to io_service
  is there work or handlers?
    yes, there is 1 handler
      run handle_async_receive handler (3)
  is there work or handlers?
    no, set io_service as stopped and return

Notice how when the read finished, it added another handler to the io_service. This subtle detail is an important feature of asynchronous programming. It allows for handlers to be chained together. For instance, if handle_async_receive did not get all the data it expected, then its implementation could post another asynchronous read operation, resulting in io_service having more work, and thus not returning from io_service.run().

Do note that when the io_service has ran out of work, the application must reset() the io_service before running it again.


Example Question and Example 3a code

Now, lets examine the two pieces of code referenced in the question.

Question Code

socket->async_receive adds work to the io_service. Thus, io_service->run() will block until the read operation completes with success or error, and ClientReceiveEvent has either finished running or throws an exception.

Example 3a Code

In hopes of making it easier to understand, here is a smaller annotated Example 3a:

void CalculateFib(std::size_t n);

int main()
{
  boost::asio::io_service io_service;
  boost::optional<boost::asio::io_service::work> work =       // '. 1
      boost::in_place(boost::ref(io_service));                // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  work = boost::none;                                         // 4
  worker_threads.join_all();                                  // 5
}

At a high-level, the program will create 2 threads that will process the io_service's event loop (2). This results in a simple thread pool that will calculate Fibonacci numbers (3).

The one major difference between the Question Code and this code is that this code invokes io_service::run() (2) before actual work and handlers are added to the io_service (3). To prevent the io_service::run() from returning immediately, an io_service::work object is created (1). This object prevents the io_service from running out of work; therefore, io_service::run() will not return as a result of no work.

The overall flow is as follows:

  1. Create and add the io_service::work object added to the io_service.
  2. Thread pool created that invokes io_service::run(). These worker threads will not return from io_service because of the io_service::work object.
  3. Add 3 handlers that calculate Fibonacci numbers to the io_service, and return immediately. The worker threads, not the main thread, may start running these handlers immediately.
  4. Delete the io_service::work object.
  5. Wait for worker threads to finish running. This will only occur once all 3 handlers have finished execution, as the io_service neither has handlers nor work.

The code could be written differently, in the same manner as the Original Code, where handlers are added to the io_service, and then the io_service event loop is processed. This removes the need to use io_service::work, and results in the following code:

int main()
{
  boost::asio::io_service io_service;

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'
  worker_threads.join_all();                                  // 5
}

Synchronous vs. Asynchronous

Although the code in the question is using an asynchronous operation, it is effectively functioning synchronously, as it is waiting for the asynchronous operation to complete:

socket.async_receive(buffer, handler)
io_service.run();

is equivalent to:

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);

As a general rule of thumb, try to avoid mixing synchronous and asynchronous operations. Often times, it can turn a complex system into a complicated system. This answer highlights advantages of asynchronous programming, some of which are also covered in the Boost.Asio documentation.

Joel Bodenmann
  • 2,152
  • 2
  • 17
  • 44
Tanner Sansbury
  • 51,153
  • 9
  • 112
  • 169
  • 15
    Awesome post. I would like to add just one thing because i feel it gets not enough attention: After run() has returned you need to call reset() on your io_service before you can run() it again. Otherwise it may return instantly whether or not there are async_ operations waiting or not. – DeVadder Nov 12 '13 at 12:51
  • Where does buffer come from? What is it? – ruipacheco Feb 11 '15 at 01:03
  • I am still confused. If mixing is sync and async is not recommended, then what is the pure async mode? can you give an example showing the code without io_service.run();? – Splash Nov 01 '15 at 20:09
  • @Splash One can use [`io_service.poll()`](http://www.boost.org/doc/libs/1_59_0/doc/html/boost_asio/reference/io_service/poll/overload1.html) to process the event loop without blocking on outstanding operations. The primary recommendation to avoid mixing synchronous and asynchronous operations is to avoid adding unnecessary complexity, and to prevent poor responsiveness when handlers take a long time to complete. There are some cases where it is safe, such as when one knows the synchronous operation will not block. – Tanner Sansbury Nov 02 '15 at 19:17
  • What do you mean by "currently" in *"Boost.Asio guarantees that handlers will only run **within a thread that is currently calling `run()`**...."*? If there are N threads (which has called `run()`), then which one is "current" thread? There can be many? Or do you mean the thread which has finished executing the `async_*()` (say `async_read`), is *guaranteed* to call its handlers as well? – Nawaz Aug 23 '16 at 07:48
  • .... Also, what if I call another `async_*` function from such handler? Will *this* `async_*` be executed in the *same* thread? (If so, then I'm wondering as to what is the point of using `async_*` in this case, as it behaves like *sync* anyway.. and what if I call more than one `async_*` functions from such handler?). – Nawaz Aug 23 '16 at 07:48
  • @Nawaz When work is posted to the `io_service`, any thread that is blocked on `io_service.run()` can be selected to perform the work. It is unspecified as to which thread will run perform which work. Hence, if a thread that has `io_service::run()` in its callstack initiates an `async_read()` operation, then this thread and other threads blocked on `io_service.run()` may be selected to run any of the work needed for the operation and the completion handler. It is very common to chain operations together by initiating an async operations from within the completion handlers of other operations. – Tanner Sansbury Aug 23 '16 at 11:40
  • Can you explain why you pass boost::in_place(boost::ref(io_service)) instead of just io_service to the work constructor in Example 3a? – shadow_map Jan 26 '17 at 12:43
  • @shadow_map this is a boost optional in-place construction idiom - see http://stackoverflow.com/questions/25185673/avoiding-temporary-when-using-boostoptional – Berkus Mar 20 '17 at 15:10
  • I don't get it. What is the point of async operation if you must call the ``ìo_service::run()``` to start to async operation and it is gonna block your current thread anyways? – Akil Demir Jul 10 '20 at 15:22
  • @TannerSansbury Which thread receives the socket data? – John Sep 20 '21 at 05:35
  • small gem, the `io_service` can be explicitly stopped via `io_service::stop()` – Tonia Sanzo Nov 24 '21 at 02:42
  • If I need an extra thread to run a completion handler, because I'm blocking on a function in that handler, and the function that blocks uses a strand for accessing a shared resource (therefore I must avoid that the completion handler runs on the same strand). Can I use a thread_pool with bind_executor(thread_pool, handler)? And the thread pool will just execute the handler for me, without using `io_context.run()` for having that handler executed? – Johannes Schaub - litb May 30 '22 at 17:52
  • Or is it sufficient If I just create a second strand and ensure that enough threads are calling run() ? The docs say that strands do not guarantee parallel execution, so I worry that this might not be enough! – Johannes Schaub - litb May 30 '22 at 17:54
23

To simplify how what run does, think of it as an employee that must process a pile of paper; it takes one sheet, does what the sheet tells, throws the sheet away and takes the next one; when he runs out of sheets, it leaves the office. On each sheet there can be any kind of instruction, even adding a new sheet to the pile. Back to asio: you can give to an io_service work in two ways, essentially: by using post on it as in the sample you linked, or by using other objects that internally call post on the io_service, like the socket and its async_* methods.

Loghorn
  • 2,729
  • 17
  • 22