2

Good evening everyone. I'm not quite new to this place but finally decided to register and ask for a help. I develop a web application using Quart framework (asynchronous Flask). And now as application became bigger and more complex I decided to separate different procedures to different server instances, this is mostly because I want to keep web server clean, more abstract and free of computational load.
So I plan to use one web server with a few (if needed) identical procedure servers. All servers are based on quart framework, for now just for simplicity of development. I decided to use Crossbar.io router and autobahn to connect all servers together.

And here the problem occurred. I followed this posts:

Running several ApplicationSessions non-blockingly using autbahn.asyncio.wamp

How can I implement an interactive websocket client with autobahn asyncio?

How I can integrate crossbar client (python3,asyncio) with tkinter

How to send Autobahn/Twisted WAMP message from outside of protocol?

Seems like I tried all possible approaches to implement autobahn websocket client in my quart application. I don't know how to make it possible so both things are working, whether Quart app works but autobahn WS client does not, or vice versa.

Simplified my quart app looks like this:

from quart import Quart, request, current_app
from config import Config
# Autobahn
import asyncio
from autobahn import wamp
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

import concurrent.futures

class Component(ApplicationSession):
    """
    An application component registering RPC endpoints using decorators.
    """

    async def onJoin(self, details):

        # register all methods on this object decorated with "@wamp.register"
        # as a RPC endpoint
        ##
        results = await self.register(self)
        for res in results:
            if isinstance(res, wamp.protocol.Registration):
                # res is an Registration instance
                print("Ok, registered procedure with registration ID {}".format(res.id))
            else:
                # res is an Failure instance
                print("Failed to register procedure: {}".format(res))

    @wamp.register(u'com.mathservice.add2')
    def add2(self, x, y):
        return x + y


def create_app(config_class=Config):

    app = Quart(__name__)
    app.config.from_object(config_class)

    # Blueprint registration
    from app.main import bp as main_bp
    app.register_blueprint(main_bp)

    print ("before autobahn start")
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        runner = ApplicationRunner('ws://127.0.0.1:8080 /ws', 'realm1')
        future = executor.submit(runner.run(Component))
    print ("after autobahn started")

    return app


from app import models

In this case application stuck in runner loop and whole application does not work (can not serve requests), it becomes possible only if I interrupt the runners(autobahn) loop by Ctrl-C.

CMD after start:

(quart-app) user@car:~/quart-app$ hypercorn --debug --error-log - --access-log - -b 0.0.0.0:8001 tengine:app
Running on 0.0.0.0:8001 over http (CTRL + C to quit)
before autobahn start
Ok, registered procedure with registration ID 4605315769796303

after pressing ctrl-C:

...
^Cafter autobahn started
2019-03-29T01:06:52 <Server sockets=[<socket.socket fd=11, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 8001)>]> is serving

How to make it possible to work quart application with autobahn client together in non-blocking fashion? So autobahn opens and keeps websocket connection to Crossbar router and silently listen on background.

Andrew K
  • 41
  • 4

1 Answers1

2

Well, after many sleepless nights I finally found a good approach to solve this conundrum.

Thanks to this post C-Python asyncio: running discord.py in a thread

So, I rewrote my code like this and was able to run my Quart app with autobahn client inside, and both are actively working in nonblocking fashion. The whole __init__.py looks like:

from quart import Quart, request, current_app
from config import Config


def create_app(config_class=Config):

    app = Quart(__name__)
    app.config.from_object(config_class)

    # Blueprint registration
    from app.main import bp as main_bp
    app.register_blueprint(main_bp)

    return app


# Autobahn
import asyncio
from autobahn import wamp
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
import threading


class Component(ApplicationSession):
    """
    An application component registering RPC endpoints using decorators.
    """

    async def onJoin(self, details):

        # register all methods on this object decorated with "@wamp.register"
        # as a RPC endpoint
        ##
        results = await self.register(self)
        for res in results:
            if isinstance(res, wamp.protocol.Registration):
                # res is an Registration instance
                print("Ok, registered procedure with registration ID {}".format(res.id))
            else:
                # res is an Failure instance
                print("Failed to register procedure: {}".format(res))


    def onDisconnect(self):
        print('Autobahn disconnected')

    @wamp.register(u'com.mathservice.add2')
    def add2(self, x, y):
        return x + y


async def start():
    runner = ApplicationRunner('ws://127.0.0.1:8080/ws', 'realm1')
    await runner.run(Component) # use client.start instead of client.run

def run_it_forever(loop):
    loop.run_forever()

asyncio.get_child_watcher() # I still don't know if I need this method. It works without it.
loop = asyncio.get_event_loop()
loop.create_task(start())
print('Starting thread for Autobahn...')
thread = threading.Thread(target=run_it_forever, args=(loop,))
thread.start()
print ("Thread for Autobahn has been started...")


from app import models

With this scenario we create task with autobahn's runner.run and attach it to the current loop and then run this loop forever in new thread.

I was quite satisfied with current solution.... but then then was found out that this solution has some drawbacks, that was crucial for me, for example: reconnect if connection dropped (i.e crossbar router becomes unavailable). With this approach if connection was failed to initialize or dropped after a while it will not try to reconnect. Additionally for me it wasn't obvious how to ApplicationSession API, i.e. to register/call RPC from the code in my quart app.

Luckily I spotted another new component API that autobahn used in their documentation: https://autobahn.readthedocs.io/en/latest/wamp/programming.html#registering-procedures https://github.com/crossbario/autobahn-python/blob/master/examples/asyncio/wamp/component/backend.py

It has auto reconnect feature and it's easy to register functions for RPC using decorators @component.register('com.something.do'), you just need to import component before.

So here is the final view of __init__.py solution:

from quart import Quart, request, current_app
from config import Config

def create_app(config_class=Config):
    ...
    return app

from autobahn.asyncio.component import Component, run
from autobahn.wamp.types import RegisterOptions
import asyncio
import ssl
import threading


component = Component(
    transports=[
        {
            "type": "websocket",
            "url": u"ws://localhost:8080/ws",
            "endpoint": {
                "type": "tcp",
                "host": "localhost",
                "port": 8080,
            },
            "options": {
                "open_handshake_timeout": 100,
            }
        },
    ],
    realm=u"realm1",
)

@component.on_join
def join(session, details):
    print("joined {}".format(details))

async def start():
    await component.start() #used component.start() instead of run([component]) as it's async function

def run_it_forever(loop):
    loop.run_forever()

loop = asyncio.get_event_loop()
#asyncio.get_child_watcher() # I still don't know if I need this method. It works without it.
asyncio.get_child_watcher().attach_loop(loop)
loop.create_task(start())
print('Starting thread for Autobahn...')
thread = threading.Thread(target=run_it_forever, args=(loop,))
thread.start()
print ("Thread for Autobahn has been started...")


from app import models

I hope it will help somebody. Cheers!

Andrew K
  • 41
  • 4