2

My setup and environment:

  • Win10
  • TCP Client: C++ with asio library (no boost), running in background thread.
  • TCP Server: Python3 with SocketServer module, running in background thread.
  • All use blocking I/O at the moment

Requriments:

  • Client sends string commands on user interaction to server occasionally.
  • Server receives commands and do stuff.

Problem:

  • Client may hang at read().
  • Server may hang at recv() after Client hangs.

The "net" result is that Server always gets my initial "Hello" handshaking, but its response string ACK was never received by Client. Looks like some sort of read/write coordination between Client and Server is required.

From questions like https://stackoverflow.com/a/1480246/987846 and Does the TCPServer + BaseRequestHandler in Python's SocketServer close the socket after each call to handle()? I learned that two problems may be involved

  • Python TCP server always closes the connection on receiving so that the handler is not called continuously.
  • TCP connection needs a keep-alive mechanism before it's closed due to a timeout.

I wonder how to fix this and what would be the best strategy for my requirements

  • Design a periodic "sanity-check" ping-pong data transmission between Server and Client.
  • Resort to non-blocking I/O, which I'm unfamiliar with.

Or is it simply a bug on my part?

Server code:

import socketserver
import sys
import threading

_dostuff = True

class CmdHandler(socketserver.StreamRequestHandler):
    def handle(self):
        while True:                        
            data = self.request.recv(1024)
            s = data.decode('utf-8')
            if s == 'Hello':
                print('HANDSHAKE: ACK', flush=True)
                self.request.send('ACK\x00'.encode())
            if s == 'Stop':            
                print('Cmd: Mute', flush=True)
                with threading.Lock():
                    _dostuff = False
            if s == 'Start':
                with threading.Lock():
                    _dostuff = True
        return

if __name__ == '__main__':
    import socket    
    import time

    # Command server
    address = ('localhost', 1234)  # let the kernel assign a port
    cmd_server = socketserver.TCPServer(address, CmdHandler)
    cmd_ip, cmd_port = cmd_server.server_address  # what port was assigned?

    t1 = threading.Thread(target=cmd_server.serve_forever)
    t1.setDaemon(True)  # don't hang on exit
    t1.start()

    while True:            
        time.sleep(1)

client code (partial)

virtual bool Connect() override {
        bool isInitialized = false;
        try {
            asio::io_context io_context;
            asio::ip::tcp::resolver resolver(io_context);
            asio::ip::tcp::resolver::query query("127.0.0.1", "1234");
            asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
            asio::ip::tcp::socket socket(io_context);
            asio::connect(socket, endpoint_iterator);
            while (true) {
                std::array<char, 128> buf;
                asio::error_code error;
                // Handshaking
                // - on connection, say hello to cmd-server; wait for ACK
                if ( ! isInitialized ) {
                    debug("CmdClient {}: handshaking ...", m_id.c_str());
                    std::string handshake("Hello");
                    asio::write(socket, asio::buffer(handshake.c_str(), handshake.length()));
                    if (error == asio::error::eof)
                        continue; // Connection closed cleanly by peer; keep trying.
                    else if (error)
                        throw asio::system_error(error); // Some other error.

                    // ***PROBLEM: THIS MAY BLOCK FOREVER***
                    size_t len = asio::read(socket, asio::buffer(buf), error);
                    // ***PROBLEM END***


                    if (len <= 0) {
                        debug("CmdClient {}: No response", m_id.c_str());
                    }                   
                    std::string received = std::string(buf.data());
                    if (received == std::string("ACK")) {
                        debug("CmdClient {}: handshaking ... SUCCESS!", m_id.c_str());
                        isInitialized = true;
                        Notify("ACK");
                    }
                    else {
                        debug("CmdClient {}: Received: {}", m_id.c_str(), received.c_str());
                    }
                    continue;
                }
                SendCommand(socket);
            }
        }
        catch (std::exception& e) {
            std::cerr << e.what() << std::endl;
            isInitialized = false;
        }
        return true;
    }


    void SendCommand(asio::ip::tcp::socket& socket) {
        std::string cmd("");
        switch (m_cmd) {
        case NoOp:
            break;
        case Stop:
            cmd = "Stop";
            break;
        case Start:
            cmd = "Start";
            break;
        default:
            break;
        }
        if (cmd.size() > 0) {
            debug("CmdClient {}: Send command: {}", m_id.c_str(), cmd.c_str());
            size_t len = asio::write(socket, asio::buffer(cmd.c_str(), cmd.length()));
            debug("CmdClient {}: {} bytes written.", m_id.c_str(), len);
            m_cmd = NoOp;  // Avoid resend in next frame;
        }
    }

If I remove the while-loop on the server side, so that it looks like

class CmdHandler(socketserver.StreamRequestHandler):
    # timeout = 5
    def handle(self):
        data = self.request.recv(1024)
        s = data.decode('utf-8')
        if s == 'Hello':
            self.request.send('ACK\x00'.encode())
        if s == 'Stop':            
            with threading.Lock():
                _dostuff = False
        if s == 'Start':
            with threading.Lock():
                _dostuff = True
        return

then

  • Server's ACK is received by Client.
  • But the subsequent messages sent by Client won't be received by Server.
kakyo
  • 10,460
  • 14
  • 76
  • 140
  • 1
    I avoid these problems by using non-blocking I/O exclusively; with non-blocking I/O, you never lose control of your thread to a blocking call. (It does mean you have to implement state-machine code to handle partial reads and writes, but that is all doable) – Jeremy Friesner Sep 11 '19 at 04:39
  • It's a bug in your code. You should use read timeouts at both ends. But if the server really closes the connection each time your client should receive an end of stream indication. – user207421 Sep 11 '19 at 04:39
  • @JeremyFriesner What exactly is your non-blocking I/O tech? Is it asio's async API or raw socket `select`? – kakyo Sep 11 '19 at 04:45
  • @user207421 If I use timeouts, does it mean that the sockets will close after the timeout and then Client has to reconnect to Server socket on every communication? – kakyo Sep 11 '19 at 04:47
  • i suggest use nc to find out the malfunctioned part, ```echo -n Hello|nc 127.0.0.1 1234``` , if you can see ACK printed by nc, server is okay, otherwise sever is doing wrong – James Li Sep 11 '19 at 04:58
  • and i noticed an unusual pattern ,server is sending ```ACK\x00```, where the ending ```\x00``` may cause unpredictable behavior – James Li Sep 11 '19 at 05:00
  • @bigdataolddriver By `nc` do you mean `telnet` on Windows? I put `\x00` to construct a valid null-terminated string and so far I can receive it after removing that server while-loop. – kakyo Sep 11 '19 at 05:13
  • @kakyo by ```nc``` i mean [nmap-ncat](https://nmap.org/download.html) . you can download windows binary from the link . it can help you on divide client+server problem to smaller one side problem – James Li Sep 11 '19 at 05:20
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/199277/discussion-between-bigdataolddriver-and-kakyo). – James Li Sep 11 '19 at 05:26
  • 1
    No, it won't close if the timeout happens, it will just throw whatever exception is involved, or return whatever error code is involved. Read timeouts are not fatal to the connection. – user207421 Sep 11 '19 at 06:13
  • @kakyo it’s raw select() – Jeremy Friesner Sep 11 '19 at 13:00

1 Answers1

1

So I finally solved this problem. Many thanks to @bigdataolddriver 's offline help. I learned a lot about ncat debugging among other things.

I basically

  • on server side: gave up on the idea of using Python's socketserver module. For one, I found out that it's synchronous only.
  • on client side: used asio::ip::tcp::socket::read_some / asio::ip::tcp::socket::write_some instead of asio::read / asio::write.

Here is the new server code based on just the socket module.

import socket
import sys
import threading

_dostuff = True

def run_cmd_server():
    global _dostuff
    # Create a TCP/IP socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # Bind the socket to the port
    server_address = ('localhost', 1234)
    print('CmdServer: starting up on {} port {}'.format(*server_address))
    sock.bind(server_address)

    # Listen for incoming connections
    sock.listen(1)

    while True:
        # Wait for a connection
        print('CmdServer: waiting for a connection')
        connection, client_address = sock.accept()
        try:
            print('CmdServer: connection from client:', client_address)

            # Receive the data in small chunks and retransmit it
            while True:
                data = connection.recv(1024)
                print('received {!r}'.format(data))
                if not data:
                    print('no data from', client_address)
                    break
                cmd = data.decode('utf-8').strip('\x00')
                if cmd == 'Hello':
                    print('Cmd: {}'.format(cmd))
                    connection.sendall('ACK\x00'.encode('utf-8'))
                elif cmd == 'Stop':
                    print('Cmd: {}'.format(cmd))                    
                    _dostuff = False
                    print('_dostuff : {}'.format(_dostuff ))
                elif cmd == 'Start':
                    _dostuff = True
                    print('_dostuff : {}'.format(_dostuff ))
                else:
                    print('Misc: {}'.format(cmd))
                connection.sendall('ack\x00'.encode('utf-8'))
        except:
            continue;
        finally:
            # Clean up the connection
            connection.close()


def main():
    t1 = threading.Thread(target=run_cmd_server, name='t_cmd', daemon=True)
    t1.start()
    t1.join()


if __name__ == '__main__':
    main()

And here is the new client code:

virtual bool Connect() override {
        bool isInitialized = false;
        try {
            asio::io_context io_context;
            asio::ip::tcp::resolver resolver(io_context);
            asio::ip::tcp::resolver::query query("127.0.0.1", "1234");
            asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
            asio::ip::tcp::socket socket(io_context);
            asio::connect(socket, endpoint_iterator);
            while (true) {
                std::array<char, 1024> readBuf{'\0'};
                asio::error_code error;
                // Handshaking
                // - on connection, say hello to cmd-server; wait for ACK
                if ( ! isInitialized ) {
                    debug("CmdClient {}: handshaking ...", m_id.c_str());
                    std::string handshake("Hello");
                    size_t len = socket.write_some(asio::buffer(handshake.c_str(), handshake.length()), error);
                    if (error == asio::error::eof) {
                        asio::connect(socket, endpoint_iterator);
                        continue; // Connection closed cleanly by peer; keep trying.
                    }
                    else if (error)
                        throw asio::system_error(error); // Some other error.
                    len = socket.read_some(asio::buffer(readBuf), error);
                    if (len <= 0) {
                        debug("CmdClient {}: No response", m_id.c_str());
                    }
                    std::string received = std::string(readBuf.data());
                    if (received == std::string("ACK")) {
                        debug("CmdClient {}: handshaking ... SUCCESS!", m_id.c_str());
                        isInitialized = true;
                        Notify("ACK");
                    }
                    else {
                        debug("CmdClient {}: Received: {}", m_id.c_str(), received.c_str());
                    }
                    continue;
                }
                SendCommand(socket);

            }
        }
        catch (std::exception& e) {
            std::cerr << e.what() << std::endl;
            isInitialized = false;
        }
        return true;
    }


    void SendCommand(asio::ip::tcp::socket& socket) {
        std::string cmd("");
        switch (m_cmd) {
        case NoOp:
            break;
        case Hello:
            cmd = "Hello";
            break;
        case Stop:
            cmd = "Stop";
            break;
        case Start:
            cmd = "Start";
            break;
        default:
            break;
        }
        if (cmd.size() > 0) {
            size_t len = socket.write_some(asio::buffer(cmd.c_str(), cmd.length()));
            m_cmd = NoOp;  // Avoid resend in next frame;
        }
    }

I have yet to use ASIO's async feature (very much scared to do so right after this debugging session). But right now at least this code works as I expected: The server can receive commands from client normally.

On a side note, since there is only one thread writing into the global variable _dostuff, I removed thread locking.

I'd still appreciate it if anyone knows where exactly my original implementation was faulty.

kakyo
  • 10,460
  • 14
  • 76
  • 140