1

I've written a simple pub/sub sketch in python. Full source code follows, but the issue I'm having is that when the broker broadcasts a message via send(), the client doesn't recv() anything. Everything else seems to be in order: listen() and accept() are working, and when the client calls send(), the broker's recv() gets the message.

Here's a sequence diagram of the interactions between two clients and the broker:

Broker                      Client 62863        Client 62867
------------------------------------------------------------
start()
                            start()
(62863) joined.
                                                start()
(62867) joined.
                            Hello from 62863
(62863): Hello from 62863
(62863) => (62867)

In that last step, the Broker calls send('Hello from 62863'), but Client 62867's recv() function doesn't receive it.

Any suggestions?

Here's the complete Broker code:

import socket
import threading

class Broker(object):

    def __init__(self, host='', port=5000):
        self._host = host
        self._port = port
        self._subscribers = []

    def start(self):
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.bind((self._host, self._port))

        while True:
            """
            Wait for a client to request a connection and spawn a thread to
            receive and forward messages from that client.
            """
            self._socket.listen()
            subscriber, addr = self._socket.accept()
            print(f'{addr} joined.', flush=True)
            self._subscribers.append(subscriber)
            threading.Thread(target=self.listen_thread, args=(subscriber,)).start()

    def listen_thread(self, publisher):
        """
        Wait for a message to arrive from a publisher, broadcast to all other
        subscribers.
        """
        while True:
            msg = publisher.recv(1024).decode()
            if msg is not None:
                print(f'{publisher.getpeername()} published: {msg}', end='', flush=True)
                self.broadcast(publisher, msg)
            else:
                print(f'{publisher.getpeername()} has disconnected')
                return

    def broadcast(self, publisher, msg):
        for subscriber in self._subscribers:
            if publisher != subscriber:        # don't send to yourself
                print(f'{publisher.getpeername()} => {subscriber.getpeername()}', flush=True)
                try:
                    subscriber.send(msg) # NOTE: See solution below!!!
                except:
                    # broken socket, remove from subscriber list
                    self._subscribers.remove(subscriber)


if __name__ == "__main__":
    Broker().start()

And here's the corresponding Client code:

import socket
import threading
import sys

class StdioClient(object):
    """
    A simple pub/sub client:
    Anything received on stdin is published to the broker.
    Concurrently, anything broadcast by the broker is printed on stdout.
    """

    def __init__(self, host='localhost', port=5000):
        self._host = host
        self._port = port

    def start(self):
        self._sock = socket.socket()
        self._sock.connect((self._host, self._port))
        threading.Thread(target=self.stdin_to_sock).start()
        threading.Thread(target=self.sock_to_stdout).start()

    def stdin_to_sock(self):
        """
        Send anything received on stdin to the broker.
        """
        for msg in sys.stdin:
            self._sock.send(bytes(msg, 'utf-8'))

    def sock_to_stdout(self):
        """
        Print anything received from the broker on stdout.
        """
        while True:
            msg = self._sock.recv(1024) # <<<= This never gets a message
            print(msg.decode('utf-8'), eol='', flush=True)

if __name__ == '__main__':
    StdioClient().start()
fearless_fool
  • 33,645
  • 23
  • 135
  • 217
  • I'm not yet sure that this is important, but I think the `eol=''` in the StdioCLient print statement should be `end=''` – President James K. Polk May 22 '23 at 15:34
  • @PresidentJamesK.Polk Right you are. I'll change it -- thanks. – fearless_fool May 22 '23 at 16:04
  • 1
    There is also a deeper problem that often doesn't show up during testing but is still a bug. The statement `msg = publisher.recv(1024).decode()` illustrates this. This statement makes two invalid assumptions: 1) that all the bytes sent in a single `send()` will be received in a single `recv()`, and 2) that a given `recv()` will returns bytes that begin the 1st byte of a (UTF-8 in this)-encoded character and will end with the last byte of encoded character. [Here is an answer](https://stackoverflow.com/a/43420503/238704) discussing the 1st problem. – President James K. Polk May 22 '23 at 21:26

1 Answers1

1

Solved. And it was a stupid, preventable, "I should have known better" error. Right here:

                try:
                    subscriber.send(msg)
                except:
                    self._subscribers.remove(subscriber)

Never ever use a blanket except: to catch errors, unless you really know what you're doinglog any errors (thank you @user207421). In this case, send() was raising an error because msg was a string, not bytes, but with the blanket except:, there was no visible error.

Moral: Always qualify your except: clauses.

(In case you're curious, the fix is this):

                    subscriber.send(bytes(msg, 'utf-8'))
fearless_fool
  • 33,645
  • 23
  • 135
  • 217
  • 1
    Nothing wrong with blanket `except:`, but you should always log exceptions. That was the real problem. – user207421 May 23 '23 at 00:49
  • @user207421 You are correct -- logging would have caught it. In environments where logging is not available, more caution is required! :) – fearless_fool May 24 '23 at 12:05