1

I'm trying to implement a websocket/wamp client using autobahn|python and asyncio, and while it's somewhat working, there are parts that have eluded me.

What I'm really trying to do is implement WAMP in qt5/QML, but this seemed like an easier path for the moment.

This simplified client mostly copied from online does work. It reads the time service when the onJoin occurs.

What I'd like to do is trigger this read from an external source.

The convoluted approach I've taken is to run the asyncio event loop in a thread, and then to send a command over a socket to trigger the read. I have so far unable to figure out where to put the routine/coroutine so that it can be found from the reader routine.

I suspect there's a simpler way to go about this but I haven't found it yet. Suggestions are welcome.

#!/usr/bin/python3
try:
    import asyncio
except ImportError:
    ## Trollius >= 0.3 was renamed
    import trollius as asyncio

from autobahn.asyncio import wamp, websocket
import threading
import time
from socket import socketpair

rsock, wsock = socketpair()

def reader() :
    data = rsock.recv(100)
    print("Received:", data.decode())

class MyFrontendComponent(wamp.ApplicationSession):
    def onConnect(self):
        self.join(u"realm1")



    @asyncio.coroutine
    def onJoin(self, details):
        print('joined')
        ## call a remote procedure
        ##
        try:
           now = yield from self.call(u'com.timeservice.now')
        except Exception as e:
           print("Error: {}".format(e))
        else:
           print("Current time from time service: {}".format(now))



    def onLeave(self, details):
        self.disconnect()

    def onDisconnect(self):
        asyncio.get_event_loop().stop()



def start_aloop() :
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    transport_factory = websocket.WampWebSocketClientFactory(session_factory,
                    debug = False,
                    debug_wamp = False)
    coro = loop.create_connection(transport_factory, '127.0.0.1', 8080)
    loop.add_reader(rsock,reader)
    loop.run_until_complete(coro)
    loop.run_forever()
    loop.close()

if __name__ == '__main__':
    session_factory = wamp.ApplicationSessionFactory()
    session_factory.session = MyFrontendComponent

    ## 4) now enter the asyncio event loop
    print('starting thread')
    thread = threading.Thread(target=start_aloop)
    thread.start()
    time.sleep(5)
    print("IN MAIN")
    # emulate an outside call
    wsock.send(b'a byte string')
dano
  • 91,354
  • 19
  • 222
  • 219
morris
  • 13
  • 4
  • So, you want to be able to trigger the client to make an RPC call to the timeservice by some external means? – dano Nov 03 '14 at 02:26

2 Answers2

0

You can listen on a socket asynchronous inside the event loop, using loop.sock_accept. You can just call a coroutine to setup the socket inside of onConnect or onJoin:

try:
    import asyncio
except ImportError:
    ## Trollius >= 0.3 was renamed
    import trollius as asyncio

from autobahn.asyncio import wamp, websocket
import socket

class MyFrontendComponent(wamp.ApplicationSession):
    def onConnect(self):
        self.join(u"realm1")

    @asyncio.coroutine
    def setup_socket(self):
        # Create a non-blocking socket
        self.sock = socket.socket()
        self.sock.setblocking(0)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(('localhost', 8889))
        self.sock.listen(5)
        loop = asyncio.get_event_loop()
        # Wait for connections to come in. When one arrives,
        # call the time service and disconnect immediately.
        while True:
            conn, address = yield from loop.sock_accept(self.sock)
            yield from self.call_timeservice()
            conn.close()

    @asyncio.coroutine
    def onJoin(self, details):
        print('joined')
        # Setup our socket server
        asyncio.async(self.setup_socket())

        ## call a remote procedure
        ##
        yield from self.call_timeservice()

    @asyncio.coroutine
    def call_timeservice(self):
        try:
           now = yield from self.call(u'com.timeservice.now')
        except Exception as e:
           print("Error: {}".format(e))
        else:
           print("Current time from time service: {}".format(now))

    ... # The rest is the same
dano
  • 91,354
  • 19
  • 222
  • 219
0

Thanks for the response dano. Not quite the solution I needed but it pointed me in the right direction. Yes, I wish to have the client mae remote RPC calls from an external trigger.

I came up with the following which allows me to pass a string for the specific call ( though only one is implemented right now)

Here's what I came up with, though I'm not sure how elegant it is.

import asyncio
from autobahn.asyncio import wamp, websocket
import threading
import time
import socket


rsock, wsock = socket.socketpair()

class MyFrontendComponent(wamp.ApplicationSession):
    def onConnect(self):
        self.join(u"realm1")

    @asyncio.coroutine
    def setup_socket(self):
        # Create a non-blocking socket
        self.sock = rsock
        self.sock.setblocking(0)
        loop = asyncio.get_event_loop()
        # Wait for connections to come in. When one arrives,
        # call the time service and disconnect immediately.
        while True:
            rcmd = yield from loop.sock_recv(rsock,80)
            yield from self.call_service(rcmd.decode())

    @asyncio.coroutine
    def onJoin(self, details):
        # Setup our socket server
        asyncio.async(self.setup_socket())


    @asyncio.coroutine
    def call_service(self,rcmd):
        print(rcmd)
        try:
           now = yield from self.call(rcmd)
        except Exception as e:
           print("Error: {}".format(e))
        else:
           print("Current time from time service: {}".format(now))



    def onLeave(self, details):
        self.disconnect()

    def onDisconnect(self):
        asyncio.get_event_loop().stop()



def start_aloop() :
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    transport_factory = websocket.WampWebSocketClientFactory(session_factory,
                    debug = False,
                    debug_wamp = False)
    coro = loop.create_connection(transport_factory, '127.0.0.1', 8080)
    loop.run_until_complete(coro)
    loop.run_forever()
    loop.close()

if __name__ == '__main__':
    session_factory = wamp.ApplicationSessionFactory()
    session_factory.session = MyFrontendComponent

    ## 4) now enter the asyncio event loop
    print('starting thread')
    thread = threading.Thread(target=start_aloop)
    thread.start()
    time.sleep(5)
    wsock.send(b'com.timeservice.now')
    time.sleep(5)
    wsock.send(b'com.timeservice.now')
    time.sleep(5)
    wsock.send(b'com.timeservice.now')
morris
  • 13
  • 4
  • You could implement the `wsock` stuff in a more `asyncio`-friendly way, using `asyncio.sleep` and `loop.socket_sendall`. That way you wouldn't need a background thread. Instead, you'd just schedule a coroutine using `asyncio.async` that called `yield from asyncio.sleep(5)` followed by `yield from loop.sock_sendall(wsock, b'com.timeservice.now')`. – dano Nov 03 '14 at 19:30
  • Thanks, I'll continue looking at this but I've only working with it for a few days and am not yet fully up to speed. I've placed the loop in the background because this code is used inside pyotherside and the foreground functions are called by the QML code. There's probably a better way to do it and I suspect it will become clearer in time. – morris Nov 03 '14 at 20:23
  • Ah, if you're intending to integrate this code inside of an application that can't be plugged into the `asyncio` event loop, then what you've got here is the right idea. – dano Nov 03 '14 at 20:26