I've written a simple pub/sub sketch in python. Full source code follows, but the issue I'm having is that when the broker broadcasts a message via send()
, the client doesn't recv()
anything. Everything else seems to be in order: listen()
and accept()
are working, and when the client calls send()
, the broker's recv()
gets the message.
Here's a sequence diagram of the interactions between two clients and the broker:
Broker Client 62863 Client 62867
------------------------------------------------------------
start()
start()
(62863) joined.
start()
(62867) joined.
Hello from 62863
(62863): Hello from 62863
(62863) => (62867)
In that last step, the Broker calls send('Hello from 62863')
, but Client 62867's recv()
function doesn't receive it.
Any suggestions?
Here's the complete Broker code:
import socket
import threading
class Broker(object):
def __init__(self, host='', port=5000):
self._host = host
self._port = port
self._subscribers = []
def start(self):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.bind((self._host, self._port))
while True:
"""
Wait for a client to request a connection and spawn a thread to
receive and forward messages from that client.
"""
self._socket.listen()
subscriber, addr = self._socket.accept()
print(f'{addr} joined.', flush=True)
self._subscribers.append(subscriber)
threading.Thread(target=self.listen_thread, args=(subscriber,)).start()
def listen_thread(self, publisher):
"""
Wait for a message to arrive from a publisher, broadcast to all other
subscribers.
"""
while True:
msg = publisher.recv(1024).decode()
if msg is not None:
print(f'{publisher.getpeername()} published: {msg}', end='', flush=True)
self.broadcast(publisher, msg)
else:
print(f'{publisher.getpeername()} has disconnected')
return
def broadcast(self, publisher, msg):
for subscriber in self._subscribers:
if publisher != subscriber: # don't send to yourself
print(f'{publisher.getpeername()} => {subscriber.getpeername()}', flush=True)
try:
subscriber.send(msg) # NOTE: See solution below!!!
except:
# broken socket, remove from subscriber list
self._subscribers.remove(subscriber)
if __name__ == "__main__":
Broker().start()
And here's the corresponding Client code:
import socket
import threading
import sys
class StdioClient(object):
"""
A simple pub/sub client:
Anything received on stdin is published to the broker.
Concurrently, anything broadcast by the broker is printed on stdout.
"""
def __init__(self, host='localhost', port=5000):
self._host = host
self._port = port
def start(self):
self._sock = socket.socket()
self._sock.connect((self._host, self._port))
threading.Thread(target=self.stdin_to_sock).start()
threading.Thread(target=self.sock_to_stdout).start()
def stdin_to_sock(self):
"""
Send anything received on stdin to the broker.
"""
for msg in sys.stdin:
self._sock.send(bytes(msg, 'utf-8'))
def sock_to_stdout(self):
"""
Print anything received from the broker on stdout.
"""
while True:
msg = self._sock.recv(1024) # <<<= This never gets a message
print(msg.decode('utf-8'), eol='', flush=True)
if __name__ == '__main__':
StdioClient().start()