5

My server is based on boost spawn echo server example, and improved in this thread. The real server is complex and I made a simpler server to show the problem:

The server listen on port 12345, receives 0x4000 bytes data from new connection.

The client runs 1000 threads, connect to the server and send 0x4000 bytes data.

The Problem: When the client running, kill the client process after 1 second, via Ctrl-C in console, then the server's io_context would be stopped, server runs into infinit loop and costs 100% cpu. If this doesn't happen, repeat launch client and kill it several times, it will happen. Maybe after several times it runs out of TCP port, just wait some minutes and try again, it happens after killing the client 3~15 times on my machine.

The boost document says the io_context.stopped() is used to determine whether it's stopped

either through an explicit call to stop(), or due to running out of work

I never call io_context.stop(), and use a make_work_guard(io_context) to keep the io_context not stop, but why it still stopped?

My Environment: Win10-64bit, boost 1.71.0

Server code:

#include <iostream>
using namespace std;

#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;
namespace ba=boost::asio;

#define SERVER_PORT 12345
#define DATA_LEN 0x4000


struct session : public std::enable_shared_from_this<session>
{
    tcp::socket socket_;
    boost::asio::steady_timer timer_;
    boost::asio::strand<boost::asio::io_context::executor_type> strand_;

    explicit session(boost::asio::io_context& io_context, tcp::socket socket)
    : socket_(std::move(socket)),
      timer_(io_context),
      strand_(io_context.get_executor())
    { }

    void go()
    {
        auto self(shared_from_this());
        boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
        {
            spawn(yield, [this, self](ba::yield_context yield) {
                timer_.expires_from_now(10s); // 10 second
                while (socket_.is_open()) {
                    boost::system::error_code ec;
                    timer_.async_wait(yield[ec]);
                    // timeout triggered, timer was not canceled
                    if (ba::error::operation_aborted != ec) {
                        socket_.close();
                    }
                }
            });

            try
            {
                // recv data
                string packet;

                // read data
                boost::system::error_code ec;

                ba::async_read(socket_,
                               ba::dynamic_buffer(packet),
                               ba::transfer_exactly(DATA_LEN),
                               yield[ec]);
                if(ec) {
                    throw "read_fail";
                }

            }
            catch (...)
            {
                cout << "exception" << endl;
            }

            timer_.cancel();
            socket_.close();
        });

    }
};
struct my_server {  
    my_server() { }
    ~my_server() { } 

    void start() {
        ba::io_context io_context;
        auto worker = ba::make_work_guard(io_context);

        ba::spawn(io_context, [&](ba::yield_context yield)
        {
            tcp::acceptor acceptor(io_context,
            tcp::endpoint(tcp::v4(), SERVER_PORT));

            for (;;)
            {
                boost::system::error_code ec;

                tcp::socket socket(io_context);
                acceptor.async_accept(socket, yield[ec]);
                if (!ec) {
                    std::make_shared<session>(io_context, std::move(socket))->go();
                } 
            }
        });

        // Run io_context on All CPUs
        auto thread_count = std::thread::hardware_concurrency();
        boost::thread_group tgroup;
        for (auto i = 0; i < thread_count; ++i) 
            tgroup.create_thread([&] {
                for (;;) {
                    try { 
                        if (io_context.stopped()) { // <- this happens after killing Client process several times
                            cout << "io_context STOPPED, now server runs infinit loop with full cpu usage" << endl;
                        }
                        io_context.run(); 
                    }
                    catch(const std::exception& e) { 
                        MessageBox(0, "This never popup", e.what(), 0); 
                    }
                    catch(const boost::exception& e) { 
                        MessageBox(0, "This never popup", boost::diagnostic_information(e).data(), 0); 
                    }
                    catch(...) { MessageBox(0, "This never popup", "", 0); }
                }
            });
        tgroup.join_all();
    }
};  

int main() {
    my_server svr;
    svr.start();
}

The Client:

#include <iostream>
#include <random>
#include <thread>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
using namespace std;

using boost::asio::ip::tcp;
namespace ba=boost::asio;

#define SERVER "127.0.0.1"
#define PORT "12345"

int main() {
    boost::asio::io_context io_context;

    static string data_0x4000(0x4000, 'a');

    boost::thread_group tgroup;
    for (auto i = 0; i < 1000; ++i) 
        tgroup.create_thread([&] {
            for(;;) {

                try {
                    tcp::socket s(io_context);
                    tcp::resolver resolver(io_context);
                    boost::asio::connect(s, resolver.resolve(SERVER, PORT));

                    ba::write(s, ba::buffer(data_0x4000));
                } catch (std::exception e) {
                    cout << " exception: " << e.what() << endl;
                } catch (...) {
                    cout << "unknown exception" << endl;
                }
            }
        });

    tgroup.join_all();

    return 0;
}

Update Workaround:

I guess problem happens with io_context and coroutine, so I tried to replace unnecessary spawn to std::thread, and it works, the io_context never stops anymore. But why the problem happens anyway?

Replace:

ba::spawn(io_context, [&](ba::yield_context yield)
{
    tcp::acceptor acceptor(io_context,
    tcp::endpoint(tcp::v4(), SERVER_PORT));

    for (;;)
    {
        boost::system::error_code ec;

        tcp::socket socket(io_context);
        acceptor.async_accept(socket, yield[ec]);
        if (!ec) {
            std::make_shared<session>(io_context, std::move(socket))->go();
        } 
    }
});

To:

std::thread([&]()
{
    tcp::acceptor acceptor(io_context,
    tcp::endpoint(tcp::v4(), SERVER_PORT));

    for (;;)
    {
        boost::system::error_code ec;

        tcp::socket socket(io_context);
        acceptor.accept(socket, ec);
        if (!ec) {
            std::make_shared<session>(io_context, std::move(socket))->go();
        } 
    }
}).detach();
aj3423
  • 2,003
  • 3
  • 32
  • 70

1 Answers1

0

I can not reproduce your problem on linux even with (very) extensive stress testing.

Even hard-killing client processes did not reveal any other effects than some sessions reaching "EOF" messages as expected.

There is the problem where you run out of available ports, but that's mainly because of the insane rate at which you're re-connecting in the client(s).

Thinking out of the box

  • could it be that you're using std::cout and/or MessageBox² without synchronization and MSVC's standard library doesn't deal well with it?
  • could it be that the asio run loop raises an exception that isn't properly caught by the catch handlers? I don't know whether this is relevant but MSVC does have SEH (Structured Exceptions)¹
  • There's no need to keep "run" in a tight loop there. If you actually want to keep running the loop, you should call io_context.restart(); in between. I do not recommend this as it will make any regular shutdown impossible.

Here's some minor tweaks to the code, in case you're interested. It adds some visualization of sessions/connections handled/made. Note that the client is mostly unchanged, but the server has some changes that might spark ideas for you:

server.cpp

#include <iostream>
#include <iomanip>

#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>

namespace ba = boost::asio;
using boost::asio::ip::tcp;
using namespace std::literals;

#define SERVER_PORT 12345
#define DATA_LEN 0x4000

void MessageBox(int, std::string const& caption, std::string const& message, ...) {
    std::cerr << caption << ": " << std::quoted(message) << std::endl;
}

struct session : public std::enable_shared_from_this<session>
{
    tcp::socket socket_;
    ba::steady_timer timer_;
    ba::strand<ba::io_context::executor_type> strand_;

    explicit session(ba::io_context& io_context, tcp::socket socket)
    : socket_(std::move(socket)),
      timer_(io_context),
      strand_(io_context.get_executor())
    { }

    void go()
    {
        auto self(shared_from_this());
        ba::spawn(strand_, [this, self](ba::yield_context yield)
        {
            spawn(yield, [this, self](ba::yield_context yield) {
                while (socket_.is_open()) {
                    timer_.expires_from_now(10s); 
                    boost::system::error_code ec;
                    timer_.async_wait(yield[ec]);
                    // timeout triggered, timer was not canceled
                    if (ba::error::operation_aborted != ec) {
                        socket_.close(ec);
                    }
                }
            });

            try
            {
                // recv data
                std::string packet;

                // read data
                ba::async_read(socket_,
                               ba::dynamic_buffer(packet),
                               ba::transfer_exactly(DATA_LEN),
                               yield);

                std::cout << std::unitbuf << ".";
            }
            catch (std::exception const& e) {
                std::cout << "exception: " << std::quoted(e.what()) << std::endl;
            }
            catch (...) {
                std::cout << "exception" << std::endl;
            }

            boost::system::error_code ec;
            timer_.cancel(ec);
            socket_.close(ec);
        });

    }
};

struct my_server {  
    void start() {
        ba::io_context io_context;
        auto worker = ba::make_work_guard(io_context);

        ba::spawn(io_context, [&](ba::yield_context yield)
        {
            tcp::acceptor acceptor(io_context,
            tcp::endpoint(tcp::v4(), SERVER_PORT));

            for (;;)
            {
                boost::system::error_code ec;

                tcp::socket socket(io_context);
                acceptor.async_accept(socket, yield[ec]);
                if (!ec) {
                    std::make_shared<session>(io_context, std::move(socket))->go();
                } 
            }
        });

        // Run io_context on All CPUs
        auto thread_count = std::thread::hardware_concurrency();
        boost::thread_group tgroup;
        for (auto i = 0u; i < thread_count; ++i) 
            tgroup.create_thread([&] {
                for (;;) {
                    try { 
                        io_context.run(); 
                        break;
                    }
                    catch(const std::exception& e) { 
                        MessageBox(0, "This never popup", e.what(), 0); 
                    }
                    catch(const boost::exception& e) { 
                        MessageBox(0, "This never popup", boost::diagnostic_information(e).data(), 0); 
                    }
                    catch(...) { MessageBox(0, "This never popup", "", 0); }
                }

                std::cout << "stopped: " << io_context.stopped() << std::endl;
            });
        tgroup.join_all();
    }
};  

int main() {
    my_server svr;
    svr.start();
}

client.cpp

#include <iostream>
#include <random>
#include <thread>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

using boost::asio::ip::tcp;
namespace ba=boost::asio;

#define SERVER "127.0.0.1"
#define PORT "12345"

int main() {
    ba::io_context io_context;

    static std::string const data_0x4000(0x4000, 'a');

    boost::thread_group tgroup;
    for (auto i = 0; i < 1000; ++i) 
        tgroup.create_thread([&] {
            for(;;) {

                try {
                    tcp::socket s(io_context);

                    tcp::resolver resolver(io_context);
                    ba::connect(s, resolver.resolve(SERVER, PORT));
                    s.set_option(ba::socket_base::reuse_address(true));

                    ba::write(s, ba::buffer(data_0x4000));
                } catch (std::exception const& e) {
                    std::cout << " exception: " << e.what() << std::endl;
                } catch (...) {
                    std::cout << "unknown exception" << std::endl;
                }
                std::cout << std::unitbuf << ".";
            }
        });

    tgroup.join_all();
}

¹ See e.g. https://learn.microsoft.com/en-us/cpp/build/reference/eh-exception-handling-model?view=vs-2019#remarks

² Perhaps MessageBox is only allowed from a "UI" thread.

sehe
  • 374,641
  • 47
  • 450
  • 633
  • 1
    the problem happens if I remove the `MessageBox`. If it's SEH, the server would crash because I don't catch it with `__try {} __except{}` block. The `io_context.restart()` is a workaround, but cannot explain the problem. When running out of ports, I just wait some minutes for the os to free the port, then do it again. I've updated the post for a workaround. – aj3423 Nov 23 '19 at 19:26