2

I need a parallel synchronous TCP solution using ASIO. I'm trying to get the example code from these examples working: https://github.com/jvillasante/asio-network-programming-cookbook/tree/master/src (using the server in ch04: 02_Sync_parallel_tcp_server.cpp and the client in ch03: 01_Sync_tcp_client.cpp).

The only thing I changed is the logging to append to text files.

The problem is that while the server runs fine, the client dies after returning a single response from the server:

libc++abi.dylib: terminating with uncaught exception of type boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >: shutdown: Socket is not connected

Code for the server:

#include <boost/asio.hpp>
#include <atomic>
#include <memory>
#include <thread>
#include <iostream>
#include <fstream>

using namespace boost;

class Service {
public:
  Service() = default;

  void StartHandlingClient(std::shared_ptr<asio::ip::tcp::socket> sock) {
    std::thread th{[this, sock]() { HandleClient(sock); }};
    th.detach();
  }

private:
  void HandleClient(std::shared_ptr<asio::ip::tcp::socket> sock) {
    try {
      asio::streambuf request;
      asio::read_until(*sock.get(), request, '\n');

      std::istream is(&request);
      std::string line;
      std::getline(is, line);

      std::ofstream log("logfile2.txt", std::ios_base::app | std::ios_base::out);
      log << "Request: " << line << "\n" << std::flush;

      // Emulate request processing.
      int i = 0;
      while (i != 1000000) i++;
      std::this_thread::sleep_for(std::chrono::milliseconds(500));

      // Sending response.
      std::string response = "Response\n";
      asio::write(*sock.get(), asio::buffer(response));
    } catch (std::system_error& e) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "Error occurred! Error code = " << e.code().value() << ". Message: " << e.what() << "\n" << std::flush;
    }

    // Clean up
    delete this;
  }
};

class Acceptor {
public:
  Acceptor(asio::io_service& ios, unsigned short port_num)
  : m_ios{ios}, m_acceptor{m_ios, asio::ip::tcp::endpoint{asio::ip::address_v4::any(), port_num}} {
    m_acceptor.listen();
  }

  void Accept() {
    auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);

    m_acceptor.accept(*sock.get());

    (new Service)->StartHandlingClient(sock);
  }

private:
  asio::io_service& m_ios;
  asio::ip::tcp::acceptor m_acceptor;
};

class Server {
public:
  Server() : m_stop{false} {}

  void Start(unsigned short port_num) {
    m_thread.reset(new std::thread([this, port_num]() { Run(port_num); }));
  }

  void Stop() {
    m_stop.store(true);
    m_thread->join();
  }

private:
  void Run(unsigned short port_num) {
    Acceptor acc{m_ios, port_num};

    while (!m_stop.load()) {
      acc.Accept();
    }
  }

private:
  std::unique_ptr<std::thread> m_thread;
  std::atomic<bool> m_stop;
  asio::io_service m_ios;
};

int main() {
  unsigned short port_num = 3333;

  try {
    Server srv;
    srv.Start(port_num);

    std::this_thread::sleep_for(std::chrono::seconds(60));

    srv.Stop();
  } catch (std::system_error& e) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "Error occurred! Error code = " << e.code().value() << ". Message: " << e.what() << "\n" << std::flush;
  }

  return 0;
}

Code for the client:

#include <boost/asio.hpp>
#include <iostream>
#include <fstream>

using namespace boost;

class SyncTCPClient {
public:
  SyncTCPClient(const std::string& raw_ip_address, unsigned short port_num)
  : m_ep(asio::ip::address::from_string(raw_ip_address), port_num), m_sock(m_ios) {
    m_sock.open(m_ep.protocol());
  }
  ~SyncTCPClient() { close(); }

  void connect() { m_sock.connect(m_ep); }

  std::string emulateLongComputationOp(unsigned int duration_sec) {
    std::string request = "EMULATE_LONG_COMP_OP " + std::to_string(duration_sec) + "\n";
    sendRequest(request);
    return receiveResponse();
  }

private:
  void close() {
    if (m_sock.is_open()) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "shutting down\n" << std::flush;
      m_sock.shutdown(asio::ip::tcp::socket::shutdown_both);
      log << "closing the socket\n" << std::flush;
      m_sock.close();
      log << "socket closed\n" << std::flush;
    }
  }

  void sendRequest(const std::string& request) { asio::write(m_sock, asio::buffer(request)); }

  std::string receiveResponse() {
    asio::streambuf buf;
    asio::read_until(m_sock, buf, '\n');

    std::istream input(&buf);
    std::string response;
    std::getline(input, response);

    return response;
  }

private:
  asio::io_service m_ios;
  asio::ip::tcp::endpoint m_ep;
  asio::ip::tcp::socket m_sock;
};

int main() {
  const std::string raw_ip_address = "127.0.0.1";
  const unsigned short port_num = 3333;

  try {
    SyncTCPClient client{raw_ip_address, port_num};

    // Sync connect.
    client.connect();

    std::cout << "Sending request to the server...\n";
    std::string response = client.emulateLongComputationOp(10);

    std::cout << "Response received: " << response << "\n";
  } catch (std::system_error& e) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "Error occurred! Error code = " << e.code().value() << ". Message: " << e.what() << "\n" << std::flush;
    return e.code().value();
  }

  return 0;
}
Jason
  • 531
  • 6
  • 19

1 Answers1

2

I don't see a lot wrong, and I cannot reproduce the problem with the code shown.

Things I do see:

  1. the thread procedure could be a static because it's stateless (delete this is a code smell)
  2. the thread needn't be detached (using boost::thread_group::join_all would be much better)
  3. you were writing to the same logfile from server as well as client; results are undefined
  4. spelling .store() and .load() on an atomic<bool> is un-idiomatic
  5. spelling out *sock.get() on any kind of smart pointer is unforgivably un-idiomatic
  6. writing code().value() - swallowing the category - is a BAD thing to do, and e.what() is NOT the way to get the message (use e.code().message()).
  7. If you need flush, you might as well use std::endl
  8. There's really no reason to use a shared_ptr in c++14:

    asio::ip::tcp::socket sock(m_ios);
    
    m_acceptor.accept(sock);
    
    std::thread([sock=std::move(sock)]() mutable { HandleClient(sock); }).detach();
    

    In C++11 stick to:

    auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);
    
    m_acceptor.accept(*sock);
    
    std::thread([sock] { HandleClient(*sock); }).detach();
    

    This means HandleClient can just take a ip::tcp::socket& instead of a smart pointer.

INTEGRATING

Server.cpp

#include <atomic>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
#include <memory>
#include <thread>

using namespace boost;

static void HandleClient(asio::ip::tcp::socket& sock) {
    try {
        asio::streambuf buf;
        asio::read_until(sock, buf, '\n');

        std::string request;
        getline(std::istream(&buf), request);

        std::ofstream log("server.log", std::ios_base::app | std::ios_base::out);
        log << "Request: " << request << std::endl;

        // Emulate request processing.
        int i = 0;
        while (i != 1000000)
            i++;
        std::this_thread::sleep_for(std::chrono::milliseconds(500));

        // Sending response.
        std::string response = "Response\n";
        asio::write(sock, asio::buffer(response));
    } catch (std::system_error &e) {
        std::ofstream log("server.log", std::ios_base::app | std::ios_base::out);
        log << e.what() << " " << e.code() << ": " << e.code().message() << std::endl;
    }
}

class Acceptor {
  public:
    Acceptor(asio::io_service &ios, unsigned short port_num)
            : m_ios{ ios }, m_acceptor{ m_ios, asio::ip::tcp::endpoint{ asio::ip::address_v4::any(), port_num } } {
        m_acceptor.listen();
    }

    void Accept() {
        auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);

        m_acceptor.accept(*sock);

        std::thread([sock] { HandleClient(*sock); }).detach();
    }

  private:
    asio::io_service &m_ios;
    asio::ip::tcp::acceptor m_acceptor;
};

class Server {
  public:
    Server() : m_stop{ false } {}

    void Start(unsigned short port_num) {
        m_thread.reset(new std::thread([this, port_num]() { Run(port_num); }));
    }

    void Stop() {
        m_stop = true;
        m_thread->join();
    }

  private:
    void Run(unsigned short port_num) {
        Acceptor acc{ m_ios, port_num };

        while (!m_stop) {
            acc.Accept();
        }
    }

  private:
    std::unique_ptr<std::thread> m_thread;
    std::atomic<bool> m_stop;
    asio::io_service m_ios;
};

int main() {
    unsigned short port_num = 3333;

    try {
        Server srv;
        srv.Start(port_num);

        std::this_thread::sleep_for(std::chrono::seconds(60));

        srv.Stop();
    } catch (std::system_error &e) {
        std::ofstream log("server.log", std::ios_base::app | std::ios_base::out);
        log << e.what() << " " << e.code() << ": " << e.code().message() << std::endl;
    }
}

Client.cpp

#include <boost/asio.hpp>
#include <fstream>
#include <iostream>

using namespace boost;

class SyncTCPClient {
  public:
    SyncTCPClient(const std::string &raw_ip_address, unsigned short port_num)
            : m_ep(asio::ip::address::from_string(raw_ip_address), port_num), m_sock(m_ios) {
        m_sock.open(m_ep.protocol());
    }
    ~SyncTCPClient() { close(); }

    void connect() { m_sock.connect(m_ep); }

    std::string emulateLongComputationOp(unsigned int duration_sec) {
        std::string request = "EMULATE_LONG_COMP_OP " + std::to_string(duration_sec) + "\n";
        sendRequest(request);
        return receiveResponse();
    }

  private:
    void close() {
        if (m_sock.is_open()) {
            std::ofstream log("client.log", std::ios_base::app | std::ios_base::out);
            log << "shutting down" << std::endl;
            m_sock.shutdown(asio::ip::tcp::socket::shutdown_both);
            log << "closing the socket" << std::endl;
            m_sock.close();
            log << "socket closed" << std::endl;
        }
    }

    void sendRequest(const std::string &request) { asio::write(m_sock, asio::buffer(request)); }

    std::string receiveResponse() {
        asio::streambuf buf;
        asio::read_until(m_sock, buf, '\n');

        std::string response;
        getline(std::istream(&buf), response);

        return response;
    }

  private:
    asio::io_service m_ios;
    asio::ip::tcp::endpoint m_ep;
    asio::ip::tcp::socket m_sock;
};

int main() {
    const std::string raw_ip_address = "127.0.0.1";
    const unsigned short port_num = 3333;

    try {
        SyncTCPClient client{ raw_ip_address, port_num };

        // Sync connect.
        client.connect();

        std::cout << "Sending request to the server...\n";
        std::string response = client.emulateLongComputationOp(10);

        std::cout << "Response received: " << response << std::endl;
    } catch (std::system_error &e) {
        std::ofstream log("client.log", std::ios_base::app | std::ios_base::out);
        log << e.what() << " " << e.code() << ": " << e.code().message() << std::endl;
        return e.code().value();
    }
}
sehe
  • 374,641
  • 47
  • 450
  • 633
  • doesn't `receiveResponse` just drop the rest of the bytes in `buf`? – Abyx Dec 07 '17 at 16:35
  • @Abyx, yes (beyond the first line). Why? That appears to be "the protocol" – sehe Dec 07 '17 at 16:49
  • Thanks for the tips (I was using separate directories for client and server, so I didn't worry about the log filename, but it is better to keep them different anyway). I tried it with your changes but I still receive the same error on the client site. I'm running this on OS X, not sure if that makes a difference. – Jason Dec 07 '17 at 18:50
  • @sehe : Regarding #5 , Why is *sock.get() un-idiomatic ? .get() on a smart pointer is definitely the API to fetch the underlying pointer... IT seems to work fine even using *socket.get() .. – Nishant Sharma Jan 03 '18 at 23:50
  • @NishantSharma The point is that it's _definitely_ un-idiomatic to get the underlying pointer from a smart pointer. That's cutting off the smarts. Pointing out that "it works" is not the line of thinking that will help you write great (c++) code – sehe Jan 04 '18 at 00:14
  • > "I'm running this on OS X, not sure if that makes a difference". It does. I get the same error "shutdown: Socket is not connected" *ONLY* in OS X (after trying linux, windows) on similar code *only* if i call socket shutdown before close on the client – Pedro Vicente Aug 11 '20 at 19:29
  • "shutdown: Socket is not connected." seems to be an OSX only message. The question is, does this arise from a flaw in our code (seems not) or a bug/behavior in OSX? – Pedro Vicente Aug 11 '20 at 21:10
  • @PedroVicente that sounds like behavior of the other end of a timing difference. If you try to shutdown gracefully but the other end already disconnected, this error condition would be expected – sehe Aug 11 '20 at 21:48
  • @sehe. Yes. But my comment was not relative to this code but to a similar message I have only in OSX that does not happens in linux or windows running the same program (a client in a continuous loop). The message is sent and received so not a big issue; I'm just curious of why the exception happens only in OSX – Pedro Vicente Aug 11 '20 at 22:03
  • @PedroVicente clear. I gave you my thoughts. What did you think about those? Also if you want you can just ask a new question that includes specific information that may help answer your question. – sehe Aug 11 '20 at 22:21
  • @sehe Thanks for the comment. I posted the case where the exception happens in my code in a new reply – Pedro Vicente Aug 12 '20 at 02:29