1

I have the following situation: Using python 3.6 and Tornado 5.1 to receive client requests by web socket. Some of these requests require you to invoke an external processing, which returns a queue and then deposits results periodically in it. These results are transmitted via websocket to the clients.

External processing is NOT a coroutine, so I invoke it using run_in_executor.

My problem: When the response time of the external processing is very large, the run_in_executor reaches the maximum number of workers (default: number of processors x 5)!

Is it safe to increase the maximum number of workers? Or is another solution recommended? !!

Below a simplified code.

From already thank you very much!!!!

#########################
##    SERVER CODE      ##
#########################


from random import randint
import tornado.httpserver
import tornado.websocket
import tornado.ioloop
import tornado.web
from random import randint
from tornado import gen
import threading
import asyncio
import queue
import time


class WSHandler(tornado.websocket.WebSocketHandler):
    """entry point for all WS request"""

    def open(self):
        print('new connection. Request: ' + str(self.request))

    async def on_message(self, message):

        # Emulates the subscription to an external object
        # that returns a queue to listen
        producer = Producer()
        q = producer.q

        while True:
            rta = await tornado.ioloop.IOLoop.current().run_in_executor(None, self.loop_on_q, q)
            if rta != None:
                await self.write_message(str(rta))
            else:
                break


    def on_close(self):
        print('connection closed. Request: ' + str(self.request) +
              '. close_reason: ' + str(self.close_reason) +
              '. close_code: ' + str(self.close_code) +
              '. get_status: ' + str(self.get_status()))


    def loop_on_q(self, q):
        rta = q.get()
        return rta

class Producer:

    def __init__(self):
        self.q = queue.Queue()
        t = threading.Thread(target=self.start)
        t.daemon = True
        t.start()

    def start(self):
        count = 1
        while True:
            # time.sleep(randint(1,5))
            if count < 100:
                self.q.put(count)
            else:
                self.q.put(None)
                break
            time.sleep(50)
            count += 1



application = tornado.web.Application([
    (r'/ws', WSHandler),
])

if __name__ == "__main__":
    asyncio.set_event_loop(asyncio.new_event_loop())
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
    print('SRV START')
    tornado.ioloop.IOLoop.instance().instance().start()


#########################
##    CLIENT CODE      ##
#########################

# If you run it more than 20 times in less than 50 seconds ==> Block
# (number of processors x 5), I have 4 cores
from websocket import create_connection

def conect():
    url = 'ws://localhost:8888/ws'
    ws = create_connection(url)
    print('Conecting')
    return ws

print('Conecting to srv')
con_ws = conect()
print('Established connection. Sending msg ...')
msj = '{"type":"Socket"}'
con_ws.send(msj)
print('Package sent. Waiting answer...')

while True:
    result = con_ws.recv()
    print('Answer: ' + str(result))
Lucas trek
  • 13
  • 3
  • Look the [uvloop_blog](https://magic.io/blog/uvloop-blazing-fast-python-networking/) and its benchmark vs `tornado`. [uvloop_github](https://github.com/MagicStack/uvloop) – Benyamin Jafari Nov 04 '18 at 13:15

1 Answers1

1

Is it safe to increase the maximum number of workers Yes, up to a certain fixed amount which can be calculated with load testing.

Or is another solution recommended? If you reach workers limit you can move workers to multiple separated servers (this approach is called horizontal scaling) and pass jobs to them with a message queue. See Celery as a batteries-included-solution or RabbitMQ, Kafka etc. if you prefer to write everything by yourself.

Fine
  • 2,114
  • 1
  • 12
  • 18