0
class Session {
public:
    Session()
    {
        service_ = boost::make_shared<boost::asio::io_service>();
        ep_ = boost::make_shared<boost::asio::ip::tcp::endpoint>(ip::address::from_string("127.0.0.1"), 8001);
        acc_ = boost::make_shared<boost::asio::ip::tcp::acceptor>(*service_, *ep_);
    }
    ~Session() {}
    void handle_accept(socket_ptr sock_ptr, const boost::system::error_code& err)
    {
        if (err)
            return;
        async_read(*sock_ptr, buffer(read_buffer_, 512),
            boost::bind(&Session::handle_msg, this, read_buffer_, sock_ptr, _1));
        memset(read_buffer_, 0, 512);
    }

    void start_accept()
    {
        boost::system::error_code ec;
        acc_->open(ep_->protocol(), ec);
        acc_->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true), ec);
        acc_->bind(*ep_, ec);
        acc_->listen(boost::asio::socket_base::max_connections, ec);

        sock_ptr_ = boost::make_shared<ip::tcp::socket>(boost::ref(*service_));
        acc_->async_accept(*sock_ptr_, boost::bind(&Session::handle_accept, this, sock_ptr_, _1));
        boost::thread IOwork(boost::bind(&Session::io_work, this));
    }

    void handle_msg(char* msg, socket_ptr sock_ptr, const boost::system::error_code& err)
    {
        if (err) {
            std::cout << msg << " -----\n\n";
            std::cout << err.message() << "\n";
            return;
        }
        else {
            std::cout << msg << " ++++++++++++++\n\n";
        }
        boost::system::error_code ec;
        handle_accept(sock_ptr, ec);
    }

    void io_work()
    {
        boost::system::error_code err;
        service_->run(err);
    }

private:
    char read_buffer_[512];
    boost::shared_ptr<boost::asio::io_service> service_ = { nullptr };
    boost::shared_ptr<boost::asio::ip::tcp::endpoint> ep_ = { nullptr };
    boost::shared_ptr<boost::asio::ip::tcp::acceptor> acc_ = { nullptr };
    socket_ptr sock_ptr_ = { nullptr };
};

Session session_;

int main()
{
    boost::thread tcp_ser1(boost::bind(&Session::start_accept, &session_));
    while (1);
}

I wrote a simple server with io_service, but after receiving the data, I reported the error of End of file. How should I solve this problem?Asynchronous receiving and asynchronous reading are adopted.The function async_read is called every time the data from the client is received, and the handle_msg is responsible for processing the messages received from the client.

drescherjm
  • 10,365
  • 5
  • 44
  • 64

1 Answers1

0

async_read with an unbounded buffer will always reach EOF.

In your case the buffer is bounded, but any stream shorter than 512 bytes that also closes the connection from the other end will still result in EOF.

Just handle the condition. e.g.

    if (err && !(bytes_transferred > 0 && err != boost::asio::error::eof)) {
        std::cout << msg << " -----\n\n";
        std::cout << err.message() << "\n";
        return;
    }
    else {
        std::cout << msg << " ++++++++++++++\n\n";
    }

Side Notes

  • In general, if you want to be able to ignore the error_codes, just don't pass the argument. Because that way you will get exception handling. Otherwise you get undefined behaviour or just unspecified behaviour for using APIs in ways that violate the pre-conditions.

  • In stead of littering your code with shared pointers, make the Session itself shareable

  • It's a bit a-typical to call your listener a session (a session is usually deemed to start for each accepted connection)

  • prefer base/member initializers in the contstructor initializer list (or use NSDMI since C++11, as you already used redundantly to initialize with nullptr)

  • consider using std::bind, std::shared_ptr, etc. instead of boost versions

  • don't pass raw buffer pointers (losing size information), and don't ignore the bytes_transferred argument. Otherwise, simply printing msg is already a a security issue - and runs into UB since if you read 512 bytes there's no NUL terminator

  • This is a data race:

     async_read(*sock_ptr, buffer(read_buffer_, 512),
         std::bind(&Session::handle_msg, this, read_buffer_, sock_ptr, _1));
     memset(read_buffer_, 0, 512);
    

    The problem is that async_read immediately returns, but you are modifying read_buffer potentially concurrently with the async_read. Data races are also UB

  • Here's another problem

      boost::thread IOwork(std::bind(&Session::io_work, this));
    

    This lets the IOwork thread object go out of scope, but the thread is not detached. I think this should lead to abnormal program termination

  • don't forget exception handling in IOWork (see Should the exception thrown by boost::asio::io_service::run() be caught?)

Fixed Code

Gave your code a work-over: Live On Coliru

#include <boost/asio.hpp>
#include <functional>
#include <iomanip>
#include <iostream>
#include <string_view>
#include <thread>

using boost::asio::ip::tcp;
using boost::system::error_code;
using namespace std::placeholders;
using namespace std::chrono_literals;

class Session : public std::enable_shared_from_this<Session> {
  public:
    Session(tcp::socket&& sock) : socket_(std::move(sock)) { }

    void start()
    {
        do_read();
    }

  private:
    void do_read()
    {
        read_buffer_ = {0};
        async_read(socket_, boost::asio::buffer(read_buffer_),
                   std::bind(&Session::handle_msg, shared_from_this(), _1, _2));
    }

    void handle_msg(error_code err, size_t bytes_transferred)
    {
        std::string_view msg(read_buffer_.data(), bytes_transferred);

        std::cout << std::quoted(msg) << " -----\n" << std::endl;

        if (err && !(bytes_transferred > 0 && err != boost::asio::error::eof)) {
            std::cerr << "Error: " << err.message() << std::endl;
            return;
        }

        do_read();
    }

  private:
    std::array<char, 512> read_buffer_{0};
    tcp::socket           socket_;
};

class Server {
  public:
    void start()
    {
        acc_.set_option(tcp::acceptor::reuse_address(true));
        acc_.listen();
        do_accept();
    }

    void stop() {
        post(service_, [this] {
            acc_.cancel(); /*or acc.close();*/
        });
    }

    ~Server() {
        work_.reset();
        //// optionally:
        //stop();

        worker_.join();
    }

  private:
    void do_accept()
    {
        acc_.async_accept(std::bind(&Server::handle_accept, this, _1, _2));
    }

    void handle_accept(error_code err, tcp::socket&& s)
    {
        if (err)
            return; // TODO log error?

        std::make_shared<Session>(std::move(s))->start();

        do_accept(); // continue accepting connections
    }

    static void io_work(boost::asio::io_service& svc)
    {
        // http://
        // www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
        for (;;) {
            try {
                svc.run();
                break; // exited normally
            } catch (std::exception const& e) {
                std::cerr << "[io_work] error: " << e.what() << std::endl;
            } catch (...) {
                std::cerr << "[io_work] An unexpected error occurred"
                          << std::endl;
            }
        }
    }

  private:
    boost::asio::io_service service_{};
    tcp::acceptor           acc_{service_, {{}, 8001}};

    boost::asio::executor_work_guard<boost::asio::io_service::executor_type>
                work_{make_work_guard(service_)};

    std::thread worker_{std::bind(&Server::io_work, std::ref(service_))};
};

int main()
{
    Server server;
    server.start();

    std::this_thread::sleep_for(5s); // for COLIRU demo

    server.stop();

    // destructor joins when all sessions exited
}

Demo online uses buffer size of 8 bytes:

"#include" -----

" <boost/" -----

"asio.hpp" -----

">
#inclu" -----

"de <func" -----

"tional>
" -----

"#include" -----

" <iomani" -----

"p>
#incl" -----

"ude <ios" -----

"tream>
#" -----

(...snipped...)

"p();

  " -----

"  // des" -----

"tructor " -----

"joins wh" -----

"en all s" -----

"essions " -----

"exited
}" -----

"
" -----

Error: End of file

Not how apparently the file size is just aligned to 8 byte buffer sizes so the last read actually reads 9 bytes while detecting EOF. You might want to drop the bytes_transferred>0 condition.

sehe
  • 374,641
  • 47
  • 450
  • 633
  • Added a much simplified/improved version of the code [Live On Coliru](http://coliru.stacked-crooked.com/a/2aabe63fb8074438) – sehe May 25 '21 at 13:32