4

I am using the code provided in the Boost example.

The server only accepts 1 connection at a time. This means, no new connections until the current one is closed.

How to make the above code accept unlimited connections at the same time?

#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <boost/asio.hpp>

using boost::asio::ip::tcp;

class session
  : public std::enable_shared_from_this<session>
{
public:
  session(tcp::socket socket)
    : socket_(std::move(socket))
  {
  }

  void start()
  {
    do_read();
  }

private:
  void do_read()
  {
    auto self(shared_from_this());
    socket_.async_read_some(boost::asio::buffer(data_, max_length),
        [this, self](boost::system::error_code ec, std::size_t length)
        {
          if (!ec)
          {
            boost::this_thread::sleep(boost::posix_time::milliseconds(10000));//sleep some time
            do_write(length);
          }
        });
  }

  void do_write(std::size_t length)
  {
    auto self(shared_from_this());
    boost::asio::async_write(socket_, boost::asio::buffer(data_, length),
        [this, self](boost::system::error_code ec, std::size_t /*length*/)
        {
          if (!ec)
          {
            do_read();
          }
        });
  }

  tcp::socket socket_;
  enum { max_length = 1024 };
  char data_[max_length];
};

class server
{
public:
  server(boost::asio::io_service& io_service, short port)
    : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
      socket_(io_service)
  {
    do_accept();
  }

private:
  void do_accept()
  {
    acceptor_.async_accept(socket_,
        [this](boost::system::error_code ec)
        {
          if (!ec)
          {
            std::make_shared<session>(std::move(socket_))->start();
          }

          do_accept();
        });
  }

  tcp::acceptor acceptor_;
  tcp::socket socket_;
};

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: async_tcp_echo_server <port>\n";
      return 1;
    }

    boost::asio::io_service io_service;

    server s(io_service, std::atoi(argv[1]));

    io_service.run();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}

As you see, the program waits for the sleep and it doesn't grab a second connection in the meantime.

Qix - MONICA WAS MISTREATED
  • 14,451
  • 16
  • 82
  • 145
SpeedCoder
  • 139
  • 10
  • 1
    If you are using the code in the example, then it will accept multiple connections. The async_accept's AcceptHandler initiates another async_accept operation. – Tanner Sansbury Oct 26 '14 at 17:05
  • How exactly did you determine that it only accepts 1 connection at a time? Did you test it? If so, explain how you tested and what results you got. Did you determine it by code inspection? If so, explain how you reached that conclusion. – David Schwartz Oct 26 '14 at 18:01
  • @Tanner Sansbury: I am using exactly the code from the example with a very simple sleep to test if it accepts multiple connections. – SpeedCoder Oct 26 '14 at 18:29
  • @David Schwartz: I am sorry for not posting my code earlier, here it is. – SpeedCoder Oct 26 '14 at 18:29
  • 2
    @SpeedCoder That "sleep" doesn't test if it accepts multiple connections. How can it accept another connection if it's sleeping? – David Schwartz Oct 27 '14 at 04:26

2 Answers2

2

You're doing a synchronous wait inside the handler which runs on the only thread that serves your io_service. This makes Asio wait with invoking the handlers for any new requests.

  1. Use a deadline_time with wait_async, or,

    void do_read() {
        auto self(shared_from_this());
        socket_.async_read_some(boost::asio::buffer(data_, max_length),
                                [this, self](boost::system::error_code ec, std::size_t length) {
            if (!ec) {
                timer_.expires_from_now(boost::posix_time::seconds(1));
                timer_.async_wait([this, self, length](boost::system::error_code ec) {
                        if (!ec)
                            do_write(length);
                    });
            }
        });
    }
    

    Where the timer_ field is a boost::asio::deadline_timer member of session

  2. as a poor-man's solution add more threads (this simply means that if more requests arrive at the same time than there are threads to handle them, it will still block until the first thread becomes available to pick up the new request)

    boost::thread_group tg;
    for (int i=0; i < 10; ++i)
        tg.create_thread([&]{ io_service.run(); });
    
    tg.join_all();
    
sehe
  • 374,641
  • 47
  • 450
  • 633
  • Can you please explain how to initialize a deadline timer in the boost example? This seems impossible for me... – SpeedCoder Oct 26 '14 at 23:32
  • I understood how the solution with multiple threads works, but I don't understand how can I implement the one with the deadline timer. In class server I add a member called: `boost::asio::io_service *io_service;` (this is a pointer to io_service) and then before calling `timer_.expires...` I init the timer: `deadline_timer timer_(*io_service);` This compiler OK but I always get the error: `The I/O operation has been aborted because of either a thread exit or an application request.` Any idea why, please? – SpeedCoder Oct 27 '14 at 08:10
  • @SpeedCoder look at the code I linked an hour ago. It's complete and works. Perhaps you forget to call `expires_from_now` – sehe Oct 27 '14 at 08:51
2

Both the original code and the modified code are asynchronous and accept multiple connections. As can be seen in the following snippet, the async_accept operation's AcceptHandler initiates another async_accept operation, forming an asynchronous loop:

        .-----------------------------------.
        V                                   |
void server::do_accept()                    |
{                                           |
  acceptor_.async_accept(...,               |
      [this](boost::system::error_code ec)  |
      {                                     |
        // ...                              |
        do_accept();  ----------------------'
      });
}

The sleep() within the session's ReadHandler causes the one thread running the io_service to block until the sleep completes. Hence, the program will be doing nothing. However, this does not cause any outstanding operations to be cancelled. For a better understanding of asynchronous operations and io_service, consider reading this answer.


Here is an example demonstrating the server handling multiple connections. It spawns off a thread that creates 5 client sockets and connects them to the server.

#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <vector>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

using boost::asio::ip::tcp;

class session
  : public std::enable_shared_from_this<session>
{
public:
  session(tcp::socket socket)
    : socket_(std::move(socket))
  {
  }

  ~session()
  {
    std::cout << "session ended" << std::endl;
  }

  void start()
  {
    std::cout << "session started" << std::endl;
    do_read();
  }

private:
  void do_read()
  {
    auto self(shared_from_this());
    socket_.async_read_some(boost::asio::buffer(data_, max_length),
        [this, self](boost::system::error_code ec, std::size_t length)
        {
          if (!ec)
          {
            do_write(length);
          }
        });
  }

  void do_write(std::size_t length)
  {
    auto self(shared_from_this());
    boost::asio::async_write(socket_, boost::asio::buffer(data_, length),
        [this, self](boost::system::error_code ec, std::size_t /*length*/)
        {
          if (!ec)
          {
            do_read();
          }
        });
  }

  tcp::socket socket_;
  enum { max_length = 1024 };
  char data_[max_length];
};

class server
{
public:
  server(boost::asio::io_service& io_service, short port)
    : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
      socket_(io_service)
  {
    do_accept();
  }

private:
  void do_accept()
  {
    acceptor_.async_accept(socket_,
        [this](boost::system::error_code ec)
        {
          if (!ec)
          {
            std::make_shared<session>(std::move(socket_))->start();
          }

          do_accept();
        });
  }

  tcp::acceptor acceptor_;
  tcp::socket socket_;
};

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: async_tcp_echo_server <port>\n";
      return 1;
    }

    boost::asio::io_service io_service;

    auto port = std::atoi(argv[1]);
    server s(io_service, port);

    boost::thread client_main(
        [&io_service, port]
        {
          tcp::endpoint server_endpoint(
              boost::asio::ip::address_v4::loopback(), port);

          // Create and connect 5 clients to the server.
          std::vector<std::shared_ptr<tcp::socket>> clients;
          for (auto i = 0; i < 5; ++i)
          {
              auto client = std::make_shared<tcp::socket>(
                  std::ref(io_service));
              client->connect(server_endpoint);
              clients.push_back(client);
          }

          // Wait 2 seconds before destroying all clients.
          boost::this_thread::sleep(boost::posix_time::seconds(2));
        });

   io_service.run();
   client_main.join();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}

The output:

session started
session started
session started
session started
session started
session ended
session ended
session ended
session ended
session ended
Community
  • 1
  • 1
Tanner Sansbury
  • 51,153
  • 9
  • 112
  • 169