0

I am building a centralized logger where nodes send messages to a log and these messages are sent using the python socket library. Here is the code on the node side

    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((ip_address, port))
    s.sendall(node_name.encode()) # Send node name to the server immediately after connection

    while True:
        event = sys.stdin.readline()
        if event:
            print(event.strip())
            s.sendall(event.strip().encode())

Messages are read from stdin and then are sent using a socket

On the logger side, every time a node is connected a new thread is made

Logger code: BUFF_SIZE = 10240 NUM_NODES = 10

    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((IP_ADDRESS, port))
    s.listen(NUM_NODES)
    while True:
        conn, addr = s.accept()
        node_name = (conn.recv(NODE_NAME_BUFF_SIZE)).decode()
        _thread.start_new_thread(new_connection_thread, (conn, addr, node_name))

new_connection_thread function

  while True:
        try:
            data = conn.recv(BUFF_SIZE).decode()
            if data:
               # Do some stuff 
        except Exception as e:
            print(str(time.time()) + f" - {node_name} disconnected")
            conn.close()
            _thread.exit() 

When working with about 5-10 messages a second across 3 nodes the code works fine. But when I start to scale to about 40 messages a second across 8 nodes, some nodes randomly start disconnecting with the following error message.

For information: inside of the "do some stuff" section I am parsing the string, adding it to a data structure, and then printing it to the loggers stdout

Error on the node side:

File "./node.py", line 26, in <module>
    main()
  File "./node.py", line 23, in main
    s.sendall(event.strip().encode())
BrokenPipeError: [Errno 32] Broken pipe
Traceback (most recent call last):
  File "generator.py", line 20, in <module>
    print("%s %s" % (time.time(), sha256(urandom(20)).hexdigest()))
BrokenPipeError: [Errno 32] Broken pipe

It seems to happen about 100 seconds into all 8 nodes being connected

What could be causing this - Am I using the sendall incorrectly or is there an error in my socket setup/threading.

I tried to try catch on

    s.sendall(event.strip().encode())

telling it to try again but that somehow made more nodes disconnect and quicker

SarthakSin
  • 17
  • 5
  • 1
    Broken pipe means the connection is no good because the server closed it. So I suppose you want to find out why the server closed it. Normally this happens if the server program crashes - it won't happen just because the server is merely too slow. – user253751 Feb 02 '23 at 00:57
  • Are you seeing the "disconnected" message in the server? You should print `e` to see the reason why it's disconnecting. – Barmar Feb 02 '23 at 01:04
  • You should break out of the server loop if `data` is empty. – Barmar Feb 02 '23 at 01:05
  • 2
    Standard message to newbies: **TCP is not message-based**. A single send does not correspond to a single receive of the same data. It's a byte stream and you'll get some number of bytes in the same order sent. Buffer the data and make sure you have a complete message. You define what is a "complete" message (send length, read until newline, etc.) – Mark Tolonen Feb 02 '23 at 01:16
  • To expand on the previous comment, things like `(conn.recv(NODE_NAME_BUFF_SIZE)).decode()` and `conn.recv(BUFF_SIZE).decode()` are unreliable. They may work sometimes and other times they will fail. That's a bug in your code. [This answer goes into greater detail](https://stackoverflow.com/a/43420503/238704). However, you will need to implement your own protocol, not necessarily the one alluded to in that answer. – President James K. Polk Feb 02 '23 at 02:19

0 Answers0