I am building a centralized logger where nodes send messages to a log and these messages are sent using the python socket library. Here is the code on the node side
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip_address, port))
s.sendall(node_name.encode()) # Send node name to the server immediately after connection
while True:
event = sys.stdin.readline()
if event:
print(event.strip())
s.sendall(event.strip().encode())
Messages are read from stdin and then are sent using a socket
On the logger side, every time a node is connected a new thread is made
Logger code: BUFF_SIZE = 10240 NUM_NODES = 10
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((IP_ADDRESS, port))
s.listen(NUM_NODES)
while True:
conn, addr = s.accept()
node_name = (conn.recv(NODE_NAME_BUFF_SIZE)).decode()
_thread.start_new_thread(new_connection_thread, (conn, addr, node_name))
new_connection_thread function
while True:
try:
data = conn.recv(BUFF_SIZE).decode()
if data:
# Do some stuff
except Exception as e:
print(str(time.time()) + f" - {node_name} disconnected")
conn.close()
_thread.exit()
When working with about 5-10 messages a second across 3 nodes the code works fine. But when I start to scale to about 40 messages a second across 8 nodes, some nodes randomly start disconnecting with the following error message.
For information: inside of the "do some stuff" section I am parsing the string, adding it to a data structure, and then printing it to the loggers stdout
Error on the node side:
File "./node.py", line 26, in <module>
main()
File "./node.py", line 23, in main
s.sendall(event.strip().encode())
BrokenPipeError: [Errno 32] Broken pipe
Traceback (most recent call last):
File "generator.py", line 20, in <module>
print("%s %s" % (time.time(), sha256(urandom(20)).hexdigest()))
BrokenPipeError: [Errno 32] Broken pipe
It seems to happen about 100 seconds into all 8 nodes being connected
What could be causing this - Am I using the sendall incorrectly or is there an error in my socket setup/threading.
I tried to try catch on
s.sendall(event.strip().encode())
telling it to try again but that somehow made more nodes disconnect and quicker