1

when I build it, and running server and then run client, that appear a error error code = 2, error message = End of file

when I code synchronous tcp server it's work ok;

thanks

full client code

#include <boost/predef.h> // Tools to identify the os

#ifdef BOOST_OS_WINDOWS
#define _WIN32_WINNT 0x0501

#if _WIN32_WINNT <= 0x0502
    #define BOOST_ASIO_DISABLE_TOCP
    #define BOOST_ASIO_ENABLE_CANCELIO
#endif
#endif

#include <boost/asio.hpp>
#include <mutex>
#include <thread>
#include <memory>
#include <iostream>
#include <map>

using namespace boost;


typedef void(*Callback) (unsigned int request_id, const std::string& response, const system::error_code& ec);

struct Session{
    Session(asio::io_service& ios, const std::string& raw_ip_address, unsigned short port_num, const std::string& request, unsigned int id, Callback callback) : m_sock(ios), m_ep(asio::ip::address::from_string(raw_ip_address),port_num), m_request(request), m_id(id), m_callback(callback), m_was_cancelled(false) {} 

    asio::ip::tcp::socket m_sock;
    asio::ip::tcp::endpoint m_ep; // Remote endpoint
    std::string m_request;

    // streambuf where the response will be stored.
    asio::streambuf m_response_buf;
    std::string m_response; // Response represented as a string

    system::error_code m_ec;

    unsigned int m_id;

    Callback m_callback;

    bool m_was_cancelled;
    std::mutex m_cancel_guard;
};

class AsyncTCPClient : public boost::asio::noncopyable {
public: 
    AsyncTCPClient(){
        m_work.reset(new boost::asio::io_service::work(m_ios));

        m_thread.reset(new std::thread([this](){
            m_ios.run();
        }));
    }


    void emulateLongComputationOp( unsigned int duration_sec, const std::string& raw_ip_address, unsigned short port_num, Callback callback, unsigned int request_id){
        std::string request = "EMULATE_LONG_CALC_OP " + std::to_string(duration_sec) + "\n";
        std::cout << "Request: " << request << std::endl;

        std::shared_ptr<Session> session = std::shared_ptr<Session> (new Session(m_ios, raw_ip_address, port_num, request, request_id, callback));

        session->m_sock.open(session->m_ep.protocol());

        // active sessions list can be accessed from multiple thread, we guard it with a mutex to avoid data coruption
        std::unique_lock<std::mutex> lock(m_active_sessions_guard);
        m_active_sessions[request_id] = session;
        lock.unlock();

        session->m_sock.async_connect(session->m_ep, [this, session](const system::error_code& ec) {
            if (ec.value() != 0) {
                session->m_ec = ec;
                onRequestComplete(session);
                return;
            }

            std::unique_lock<std::mutex> cancel_lock(session->m_cancel_guard);

            if (session->m_was_cancelled) {
                onRequestComplete(session);
                return;
            }

            asio::async_write(session->m_sock, asio::buffer(session->m_request), [this, session](const boost::system::error_code &ec, std::size_t bytes_transferred) {
                if (ec.value() != 0) {
                    session->m_ec = ec;
                    onRequestComplete(session);
                    return;
                }
                std::unique_lock<std::mutex> cancel_lock(session->m_cancel_guard);

                if (session->m_was_cancelled) {
                    onRequestComplete(session);
                    return;
                }
                
                asio::async_read_until(session->m_sock, session->m_response_buf, '\n',
                                       [this, session](const boost::system::error_code &ec,
                                                       std::size_t bytes_transferred) {
                                           if (ec.value() != 0) {
                                               session->m_ec = ec;
                                           } else {
                                               std::istream strm(&session->m_response_buf);
                                               std::getline(strm, session->m_response);
                                           }

                                           onRequestComplete(session);
                                       });
            });
        });
    };

    // Cancels the request
    void cancelRequest(unsigned int request_id){
        std::unique_lock<std::mutex> lock(m_active_sessions_guard);

        auto it = m_active_sessions.find(request_id);
        if(it != m_active_sessions.end()){
            std::unique_lock<std::mutex> cancel_lock(it->second->m_cancel_guard);

            it->second->m_was_cancelled = true;
            it->second->m_sock.cancel();
        }
    }


    void close(){
        // Destroy work object
        m_work.reset(NULL);
        // wait for the I/O thread tot exit
        m_thread->join();
    }

private:
    void onRequestComplete(std::shared_ptr<Session> session){
        // shutting down the connection, we don't care about the error code if function failed
        boost::system::error_code ignored_ec;

        session->m_sock.shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec);

        // remove session from the map of active sessions
        std::unique_lock<std::mutex> lock(m_active_sessions_guard);

        auto it = m_active_sessions.find(session->m_id);
        if(it != m_active_sessions.end()){
            m_active_sessions.erase(it);
        }

        lock.unlock();

        boost::system::error_code ec;

        if(session->m_ec.value() == 0 && session->m_was_cancelled){
            ec = asio::error::operation_aborted;
        }else{
            ec = session->m_ec;
        }

        session->m_callback(session->m_id, session->m_response, ec);
    };
private:
    asio::io_service m_ios;
    std::map<int, std::shared_ptr<Session>> m_active_sessions;
    std::mutex m_active_sessions_guard;
    std::unique_ptr<boost::asio::io_service::work> m_work;
    std::unique_ptr<std::thread> m_thread;
};


void handler(unsigned int request_id, const std::string& response, const system::error_code& ec){
    if(ec.value() == 0){
        std::cout << "Request #" << request_id << " has completed. Reponse: "<< response << std::endl;
    }else if(ec == asio::error::operation_aborted){
        std::cout << "Request #" << request_id << " has been cancelled by the user. "  << std::endl;
    }else{
        std::cout << "Request #" << request_id << " failed! Error code = " << ec.value() << ". Error Message = " << ec.message() << std::endl;
    }
    return;
}


int main(){
    try{
        AsyncTCPClient client;

        // emulate the user's behavior
        client.emulateLongComputationOp(10, "127.0.0.1", 3333, handler, 1);

        std::this_thread::sleep_for(std::chrono::seconds(60));

        // another request with id 2
        client.emulateLongComputationOp(11, "127.0.0.1", 3334, handler, 2);

        // cancel request 1
        client.cancelRequest(1);

        std::this_thread::sleep_for(std::chrono::seconds(6));

        // another request with id 3
        client.emulateLongComputationOp(12, "127.0.0.1", 3335, handler, 3);

        std::this_thread::sleep_for(std::chrono::seconds(15));

        // exit the application
        client.close();
    }
    catch(system::system_error &e){
        std::cout << "Error occured! Error code = " << e.code() << ". Message: " << e.what();

        return e.code().value();
    }

    return 0;
}

full server code

#include <boost/asio.hpp>

#include <thread>
#include <atomic>
#include <memory>
#include <iostream>

using namespace boost;

class Service {
public:
    Service(std::shared_ptr<asio::ip::tcp::socket> sock) : m_sock(sock) {}

    void StartHandling() {
        asio::async_read_until(*m_sock.get(), m_request, '\n', [this](const boost::system::error_code& ec, std::size_t bytes_transferred){
            onRequestReceived(ec, bytes_transferred);
        });
        std::istream is(&m_request);
        std::string line;
        std::getline(is, line);
        std::cout << "m_request: " << line << std::endl;
    }

private:
    void onRequestReceived(const boost::system::error_code& ec, std::size_t bytes_transfered){
        std::cout << "ec.value : " << ec.value() << std::endl;
        if (ec.value() != 0){
            std::cout << "Error occurred! Error code = " << ec.value() << ".Message: " << ec.message();
            onFinish();
            return;
        }

        // Process the request
        asio::async_write(*m_sock.get(), asio::buffer(m_response), [this](const boost::system::error_code& ec, std::size_t bytes_transferred){
            onResponseSent(ec, bytes_transferred);
        });
    }

    void onResponseSent(const boost::system::error_code& ec, std::size_t bytes_transferred){
        if(ec.value() != 0){
            std::cout << "Error occurred! Error code = " << ec.value() << ". Message: " << ec.message();
        }

        onFinish();
    }

    // cleanup
    void onFinish(){
        delete this;
    }

    std::string ProcessingRequest(asio::streambuf& request){
        // parse the request, process it and prepare the request

        // Emulating CPU-consuming operations
        int i = 0;
        while (i != 1000){
            i++;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }

        std::string response = "Response\n";
        return response;
    }

    std::shared_ptr<asio::ip::tcp::socket> m_sock;
    std::string m_response;
    asio::streambuf m_request;
};


class Acceptor {
public:
    Acceptor(asio::io_service& ios, unsigned short port_num) : m_ios(ios), m_acceptor(m_ios, asio::ip::tcp::endpoint(asio::ip::address_v4::any(), port_num)), m_isStopped(
            false) {}
    // Start accepting incoming connection request.
    void Start(){
        m_acceptor.listen();
        InitAccept();
    }

    void Stop() {
        m_isStopped.store(true);
    }

private:
    void InitAccept() {
        std::shared_ptr<asio::ip::tcp::socket> sock(new asio::ip::tcp::socket(m_ios));

        m_acceptor.async_accept(*sock.get(), [this, sock](const boost::system::error_code& error){
            onAccept(error, sock);
        });
    }

    void onAccept(const boost::system::error_code& ec, std::shared_ptr<asio::ip::tcp::socket> sock){
        if(ec.value() == 0){
            (new Service(sock))->StartHandling();
        }else{
            std::cout << "Error occurred! Error code = " << ec.value() << ". Message: " << ec.message();
        }

        // Init next accept operation if acceptor has not been stopped yet
        if(!m_isStopped.load()){
            InitAccept();
        }else{
            // free resources
            m_acceptor.close();
        }
    }

private:
    asio::io_service& m_ios;
    asio::ip::tcp::acceptor m_acceptor;
    std::atomic<bool> m_isStopped;
};


class Server{
public:
    Server() {
       m_work.reset(new asio::io_service::work(m_ios));
    }

    // Start the server
    void Start(unsigned short port_num, unsigned int thread_pool_size){
        assert(thread_pool_size > 0);

        // Create and start Acceptor
        acc.reset(new Acceptor(m_ios, port_num));
        acc->Start();

        // Create specified number of thread and add them to the pool
        for(unsigned int i = 0; i < thread_pool_size; i++){
            std::cout << "Thread " << i << " Running !";
            std::unique_ptr<std::thread> th(new std::thread([this](){
                m_ios.run();
            }));
            m_thread_pool.push_back(std::move(th));
        }
    }

    // Stop the Server
    void Stop(){
        acc->Stop();
        m_ios.stop();

        for(auto& th : m_thread_pool){
            th->join();
        }
    }

private:
    asio::io_service m_ios;
    std::unique_ptr<asio::io_service::work> m_work;
    std::unique_ptr<Acceptor> acc;
    std::vector<std::unique_ptr<std::thread>> m_thread_pool;
};

const unsigned int DEFAULT_THREAD_POOL_SIZE = 2;

int main(){
    unsigned short port_num = 3333;

    try{
        Server srv;

        unsigned int thread_pool_size = std::thread::hardware_concurrency() * 2;

        if (thread_pool_size == 0){
            thread_pool_size = DEFAULT_THREAD_POOL_SIZE;
        }

        srv.Start(port_num, thread_pool_size);

        std::this_thread::sleep_for(std::chrono::seconds(60));

        srv.Stop();
    }
    catch(system::system_error &e){
        std::cout << "Error occurred! Error code = " << e.code() << ". Message: " << e.what();
    }

    return 0;
}
Loyio
  • 57
  • 6
  • Are you using "that book[¹](https://stackoverflow.com/a/48084444/85371)"? `*m_sock.get()` is such a giant code smell, I immediately recognize it. – sehe Mar 22 '22 at 11:43
  • @sehe yes, the cookbook, is that right – Loyio Mar 22 '22 at 12:50
  • I was afraid so. Sadly, the book is full of suboptimal practice. Sure, it may help you get the basics down, but sure doesn't lead to excellent code. (And yes, I too bought the book...) – sehe Mar 22 '22 at 17:29
  • @sehe Is there anything to recommend? thanks – Loyio Mar 23 '22 at 01:33

1 Answers1

2

The server closes the connection after sending the (empty) response. That leads to EOF on the client, naturally. Just handle it.

There's loads of code smells

  • delete this; is an abomination, just make Service shared_from_this.

  • No need to use shared_ptrs other than that

  • When you use smart pointers, use them. Don't "convert to raw pointer" just to dereference (so *m_socket instead of *m_socket.get()).

  • In fact, there should be no need to use new, delete or get() in your code

  • You are accessing the m_request immediately after async_read_until which is too early,

    • it is a data race (so Undefined Behaviour)
    • it doesn't get the request, because async_read_until didn't complete yet.

    So move that code into onRequestReceived at a minimum

  • It's pretty unnecessary to use an istream to read the line from the request when you already have bytes_transferred. I'd suggest

       if (bytes_transferred) {
           std::string line(m_request.data().data(), bytes_transferred - 1);
           m_request.consume(bytes_transferred);
           std::cout << "request: " << line << std::endl;
       }
    

    Or even:

       std::cout << "request: ";
       std::cout.write(asio::buffer_cast<char const*>(m_request.data()),
                       bytes_transferred - 1);
       m_request.consume(bytes_transferred);
    

    Or, if you indeed wanted to show the entire m_request, simply

       std::cout << "m_request: " << &m_request << std::endl;
    
  • Note that read_until may read more than just including the delimiter; for your safety you might want to validate that no other data is trailing, or process it as well

  • Never switch on error_code::value(), that loses the error category, which is essential to interpret error codes.

  • Why unique_ptr for each thread? Just a deque<thread>:

     while (thread_pool_size--)
         m_thread_pool.emplace_back([this] { m_ios.run(); });
    

    But see Should the exception thrown by boost::asio::io_service::run() be caught?

  • Why unique_ptr for acceptor?

  • Why a separate class for acceptor? It's not like the server allows more than 1

  • why a vector of threads anyways? Prefer boost::thread_group

  • why a manual thread pool? Prefer asio::thread_pool - which already uses the hardware_concurrency if available

In terms of review, the TCPAsyncClient looks like an attempt to implement async_result protocol. It misses the mark on many points. So I'll just point to something like how do i return the response back to caller asynchronously using a final callback dispatched from on_read handler? or How do I make this HTTPS connection persistent in Beast?. They have pretty similar interfaces (perhaps except for the cancellation, if I remember correctly).

Fixed/Return Demo

Here's the completed sample. It includes request parsing, so the server waits the actual amount of time requested.

I scaled all the times down 10x so it can complete online.

Client and server are in single source. Starting with:

./sotest&
./sotest client
wait

Completes both in 6 seconds (see screengrab below)

Live On Coliru

#include <boost/asio.hpp>
#include <boost/spirit/home/x3.hpp> // for request parsing
#include <iomanip>
#include <iostream>
#include <map>
#include <mutex>
#include <thread>

namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using std::this_thread::sleep_for;

/////// server //////////////////////////////////////////////////////////
struct Service : std::enable_shared_from_this<Service> {
    Service(tcp::socket sock) : m_sock(std::move(sock)) {}

    void StartHandling() {
        async_read_until(
            m_sock, asio::dynamic_buffer(m_request), '\n',
            [this, self = shared_from_this()](error_code ec, size_t bytes) {
                onRequestReceived(ec, bytes);
            });
    }

  private:
    void onRequestReceived(error_code ec, size_t /*bytes*/) {
        std::cout << "onRequestReceived: " << ec.message() << std::endl;
        if (ec)
            return;

        // Process the request
        m_response = ProcessingRequest(m_request);

        async_write(
            m_sock, asio::buffer(m_response),
            [this, self = shared_from_this()](error_code ec, size_t bytes) {
                onResponseSent(ec, bytes);
            });
    }

    void onResponseSent(error_code ec, size_t /*bytes*/) {
        std::cout << "onResponseSent: " << ec.message() << std::endl;
    }

    std::string static ProcessingRequest(std::string request) {
        std::cout << "request: " << request << std::endl;

        // parse the request, process it and prepare the response
        namespace x3 = boost::spirit::x3;
        double value;
        if (parse(request.begin(), request.end(),
                "EMULATE_LONG_CALC_OP " >> x3::double_ >> "s" >> x3::eol >> x3::eoi,
                value)) //
        {
            // Emulating time-consuming operation
            sleep_for(1.0s * value);
            return "Waited " + std::to_string(value) + "s\n";
        }

        return "Unknown request\n";
    }

    tcp::socket m_sock;
    std::string m_request, m_response;
};

struct Server {
    Server(asio::any_io_executor ex, uint16_t port_num)
        : m_acceptor{ex, {{}, port_num}} {
        m_acceptor.listen();
        accept_loop();
    }

    void Stop() { m_acceptor.cancel(); }

  private:
    void accept_loop() {
        m_acceptor.async_accept([this](error_code ec, tcp::socket sock) {
            std::cout << "OnAccept: " << ec.message() << std::endl;
            if (!ec) {
                std::make_shared<Service>(std::move(sock))->StartHandling();
                accept_loop();
            } 
            //m_acceptor.close();
        });
    }

    tcp::acceptor m_acceptor;
};

void server(uint16_t port) try {
    asio::thread_pool io;
    Server            srv{io.get_executor(), port};

    sleep_for(6s);

    srv.Stop();
    io.join();
} catch (std::exception const& e) {
    std::cout << "Exception: " << e.what() << std::endl;
}

/////// client //////////////////////////////////////////////////////////

struct RequestOp : public std::enable_shared_from_this<RequestOp> {
    using Callback = std::function<void( //
        unsigned /*request_id*/, std::string_view /*response*/, error_code)>;

    RequestOp(asio::any_io_executor ex, const std::string& raw_ip_address,
              uint16_t port_num, std::string request, unsigned id,
              Callback callback)
        : m_ep(asio::ip::address::from_string(raw_ip_address), port_num)
        , m_sock(ex, m_ep.protocol())
        , m_request(std::move(request))
        , m_id(id)
        , m_callback(callback) {}

    void Run() {
        // assumed on logical strand
        m_sock.async_connect(
            m_ep, [this, self = shared_from_this()](error_code ec) {
                if ((m_ec = ec) || m_was_cancelled)
                    return onComplete();

                asio::async_write(m_sock, asio::buffer(m_request),
                                  [this, self = shared_from_this()](
                                      error_code ec, size_t /*bytes*/) {
                                      onRequestWritten(ec);
                                  });
            });
    }

    void Cancel() {
        m_was_cancelled = true;
        dispatch(m_sock.get_executor(), [self=shared_from_this()]{ self->doCancel(); });
    }

  private:
    void doCancel() {
        m_sock.cancel();
    }

    void onRequestWritten(error_code ec) {
        if ((m_ec = ec) || m_was_cancelled)
            return onComplete();

        asio::async_read_until(
            m_sock, asio::dynamic_buffer(m_response), '\n',
            [this, self = shared_from_this()](error_code ec, size_t bytes) {
                onResponseReceived(ec, bytes);
            });
    }

    void onResponseReceived(error_code ec, size_t /*bytes*/) {
        if ((m_ec = ec) || m_was_cancelled)
            return onComplete();

        if (!m_response.empty())
            m_response.resize(m_response.size() - 1); // drop '\n'

        onComplete();
    }

    void onComplete() {
        // shutting down the connection, we don't care about the error code
        // if function failed
        error_code ignored_ec;
        m_sock.shutdown(tcp::socket::shutdown_both, ignored_ec);

        if(!m_ec && m_was_cancelled){
            m_ec = asio::error::operation_aborted;
        }

        m_callback(m_id, m_response, m_ec);
    }

    tcp::endpoint m_ep; // Remote endpoint
    tcp::socket   m_sock;
    std::string   m_request;

    std::string m_response; // Response represented as a string

    error_code m_ec;

    unsigned m_id;

    Callback m_callback;

    std::atomic_bool m_was_cancelled{false};
};

class AsyncTCPClient {
  public:
    AsyncTCPClient(asio::any_io_executor ex) : m_executor(ex) {}

    using Duration = std::chrono::steady_clock::duration;

    size_t emulateLongCalcOp(Duration delay, std::string const& raw_ip_address,
                             uint16_t port_num, RequestOp::Callback callback) {
        auto request =
            "EMULATE_LONG_CALC_OP " + std::to_string(delay / 1.0s) + "s\n";
        std::cout << "Request: " << request << std::flush;

        auto const request_id = m_nextId++;
        auto session = std::make_shared<RequestOp>(
            make_strand(m_executor), //
            raw_ip_address, port_num, request, request_id, callback);

        {
            // active sessions list can be accessed from multiple thread, we
            // guard it with a mutex to avoid data coruption
            std::unique_lock lock(m_active_sessions_guard);

            auto [_,ok] = m_pending_ops.emplace(request_id, session);
            assert(ok); // duplicate request_id?

            // optionally: garbage collect completed sessions
            std::erase_if(m_pending_ops,
                          [](auto& kv) { return kv.second.expired(); });
        };

        session->Run();
        return request_id;
    }

    // Cancels the request
    void cancelRequest(unsigned request_id) {
        std::unique_lock lock(m_active_sessions_guard);

        if (auto session = m_pending_ops[request_id].lock())
            session->Cancel();
    }

  private:
    using PendingOp = std::weak_ptr<RequestOp>;

    asio::any_io_executor    m_executor;
    std::mutex               m_active_sessions_guard;
    size_t                   m_nextId = 1;
    std::map<int, PendingOp> m_pending_ops;
};

void handler(unsigned request_id, std::string_view response, error_code ec) {
    std::cout << "Request #" << request_id << " ";

    if (!ec.failed())
        std::cout << "Response: " << std::quoted(response) << std::endl;
    else if (ec == asio::error::operation_aborted)
        std::cout << "Cancelled" << std::endl;
    else
        std::cout << ec.message() << std::endl;
}

void client(uint16_t port) try {
    asio::thread_pool io;

    {
        AsyncTCPClient client(io.get_executor());

        auto id1 = client.emulateLongCalcOp(4s, "127.0.0.1", port, handler);
        auto id2 = client.emulateLongCalcOp(1100ms, "127.0.0.1", port, handler);
        auto id3 = client.emulateLongCalcOp(3500ms, "127.0.0.1", port, handler);

        // cancel request 1
        sleep_for(3s);
        client.cancelRequest(id1);

        sleep_for(1200ms);

        client.cancelRequest(id2); // no effect, already completed
        client.cancelRequest(id3); // no effect, already completed
        // exit the application
    }

    io.join();
} catch (std::exception const& e) {
    std::cout << "Exception: " << e.what() << std::endl;
}

/////// main //////////////////////////////////////////////////////////
int main(int argc, char**) {
    if (argc > 1)
        client(3333);
    else
        server(3333);
}

Prints client:

Request: EMULATE_LONG_CALC_OP 4.000000s                                                     
Request: EMULATE_LONG_CALC_OP 1.100000s                                                                                                                                   
Request: EMULATE_LONG_CALC_OP 3.500000s                                                                                
Request #2 Response: "Waited 1.100000s"                                                                                                     
Request #1 Cancelled                                               
Request #3 Response: "Waited 3.500000s"                                                                                                     

Prints server:

OnAccept: Success
OnAccept: Success
onRequestReceived: Success
request: EMULATE_LONG_CALC_OP 1.100000s

onRequestReceived: Success
request: EMULATE_LONG_CALC_OP 4.000000s

OnAccept: Success
onRequestReceived: Success
request: EMULATE_LONG_CALC_OP 3.500000s

onResponseSent: Success
onResponseSent: Success
onResponseSent: Success
OnAccept: Operation canceled

enter image description here


sehe
  • 374,641
  • 47
  • 450
  • 633
  • I didn't mention it, but the original code had another data race on the socket in the client session. `it->second->m_sock->cancel()` must be on a strand. My example fixes it (see `make_strand` and `Cancel()`). These are serious errors. They will make your production code crash/exhibit undefined behavior in rare circumstances. You don't want to be debugging these. – sehe Mar 22 '22 at 17:32
  • thanks, I forgot to running the processing when I read the comand – Loyio Mar 23 '22 at 01:32
  • thanks a lot for fix my code errors, your code is great! – Loyio Mar 23 '22 at 01:44