3

I have the following setup:

Running under Linux x64 using Python 3.4 is a CherryPy-10.2.1 webserver with a ws4py-0.4.2 broadcast handler that will print the received message (containing a timestamp) and the runtime of this message.

Then there are 60 client threads (I was also using 10, 20, 50 and 70 threads with similar results) that will connect to the server using ws4py. They'll send the timestamp-messages every 0.1 seconds.

After around 48500 lines in the server logfile the messages will only arrive after 10 second gaps. But the client threads keep sending with the original speed. It seems like the messages are sent, get buffered and only released after 10 seconds.

If I terminate the client threads, the messages aren't lost but all the blocked messages get released and appear in the broadcast handler (with their respective runtime, depending on how long you keep the threads running. Could be 15 minutes if you let the client threads send that long.)

If I close sock of the client and connect again after 850 sends (which would be after 51000 messages) the messages get blocked at first but due to the reconnect those messages are "flushed" shortly after and the messages appear in the server log.

Where are those messages blocked? The ws4py send() function is using socket.sendall(), is it an issue with this send? Or are the messages blocked on the server side (because the messages are still delivered if I kill the clients)?

Does anybody know this kind of blocking of messages?

Server:

# ws4py
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
from ws4py.websocket import WebSocket
from ws4py import configure_logger

# general
import cherrypy
import logging
import datetime
import json

# settings
host_ip = '127.0.0.1'
host_port = 20000
filepath = '/path/to/server.log'
loglevel = logging.INFO

# logger
logger = configure_logger(stdout=True, filepath=filepath, level=loglevel)

# cherrypy
cherrypy.config.update({'server.socket_host': host_ip,'server.socket_port': host_port})
WebSocketPlugin(cherrypy.engine).subscribe()
cherrypy.tools.websocket = WebSocketTool()


class BroadcastWebSocketHandler(WebSocket):

    def opened(self):
        logger.info("BroadcastWebSocketHandler - opened")

    def received_message(self, message):
        msg = {}
        try:
            msg = json.loads(str(message))
        except Exception as e:
            logger.critical(repr(e))

        if 'timestamp' in msg:
            format_str = '%Y-%m-%d %H:%M:%S.%f'
            dt_now = datetime.datetime.utcnow()
            dt = datetime.datetime.strptime(msg['timestamp'], format_str)
            delta = dt_now - dt
            logger.info("BroadcastWebSocketHandler - received message: {} - runtime: {}".format(str(message), str(delta)))
        else:
            logger.info("BroadcastWebSocketHandler - received message: {}".format(str(message)))

        cherrypy.engine.publish('websocket-broadcast', str(message))


class Root(object):

    @cherrypy.expose
    def index(self):
        return 'Text.'

    @cherrypy.expose
    def ws(self):
        handler = cherrypy.request.ws_handler


if __name__ == '__main__':
    cherrypy.quickstart(Root(), '/', config={'/ws': {'tools.websocket.on': True,
                                                     'tools.websocket.handler_cls': BroadcastWebSocketHandler}})

Client:

# ws4py
from ws4py.client.threadedclient import WebSocketClient, WebSocketBaseClient
from ws4py import format_addresses

# general
import logging
import json
import time
import datetime
import threading

# settings
host_ip = '127.0.0.1'
host_port = 20000
sleeptime = 0.1
n_clients = 60
filename = '/path/to/client.log'
loglevel = logging.INFO

# logging
log_format = '[%(asctime)-15s] %(message)s'
logging.basicConfig(level=loglevel, filename=filename, format=log_format)

# global
keep_sending = True


def send_messages():
    client = WebSocketBaseClient('ws://{}:{}/ws'.format(host_ip, host_port))
    logging.info('Connecting')
    client.connect()
    while keep_sending:
        time.sleep(sleeptime)
        client.send(json.dumps({'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')}))
    logging.info('Exiting')


if __name__ == '__main__':

    threads = []
    for i in range(n_clients):
        t = threading.Thread(target=send_messages)
        threads.append(t)
        t.daemon = True
        t.start()

    for t in threads:
        try:
            t.join()
        except KeyboardInterrupt:
            keep_sending = False
            logging.info('KeyboardInterrupt')

Update 2018-Feb-07:

The reason for the "blocking behaviour" is a timeout in the call self.sock.sendall() in the websocket.py file of the ws4py package.

You can edit the broadcast() function of the class WebSocketManager (manager.py) to log the exception.

But I still don't know why this example produces those timeouts. One thing I noticed, the problem doesn't seem to occur when using the threaded client WebSocketClient instead of the WebSocketBaseClient.

0 Answers0