3

I have not found any solution for my problem. I need to create a python application with two thread, each of which is connected to a WAMP Router using autobahn library.

Follow I write the code of my experiment:

wampAddress = 'ws://172.17.3.139:8181/ws'
wampRealm = 's4t'

from threading import Thread
from autobahn.twisted.wamp import ApplicationRunner
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks


class AutobahnMRS(ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
        print("Sessio attached [Connect to WAMP Router]")

        def onMessage(*args):
            print args
        try:
            yield self.subscribe(onMessage, 'test')
            print ("Subscribed to topic: test")

        except Exception as e:
            print("Exception:" +e)

class AutobahnIM(ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
        print("Sessio attached [Connect to WAMP Router]")

        try:
            yield self.publish('test','YOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO')
            print ("Subscribed to topic: test")

        except Exception as e:
            print("Exception:" +e)

class ManageRemoteSystem:
    def __init__(self):
        self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm)

    def start(self):
        self.runner.run(AutobahnMRS);


class InternalMessages:
    def __init__(self):
        self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm)

    def start(self):
        self.runner.run(AutobahnIM);

#class S4tServer:

if __name__ == '__main__':
    server = ManageRemoteSystem()
    sendMessage = InternalMessages()

    thread1 = Thread(target = server.start())
    thread1.start()
    thread1.join()

    thread2 = Thread(target = sendMessage.start())
    thread2.start()
    thread2.join()

When I launch this python application only the thread1 is started and later when I kill the application (ctrl-c) the following error messages are shown:

Sessio attached [Connect to WAMP Router]
Subscribed to topic: test
^CTraceback (most recent call last):
  File "test_pub.py", line 71, in <module>
    p2 = multiprocessing.Process(target = server.start())
  File "test_pub.py", line 50, in start
    self.runner.run(AutobahnMRS);
  File "/usr/local/lib/python2.7/dist-packages/autobahn/twisted/wamp.py", line 175, in run
    reactor.run()
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1191, in run
    self.startRunning(installSignalHandlers=installSignalHandlers)
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1171, in startRunning
    ReactorBase.startRunning(self)
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 683, in startRunning
    raise error.ReactorNotRestartable()
twisted.internet.error.ReactorNotRestartable

I need to implement in one application that has its functionalities and also it must has a system for communicate to a WAMP router with autobahn python library.

In other words, I need a solution able to communicate with a WAMP router, but in the same time this application doesn't must be blocked with the autobahn part (I think that the solution is to start two thread, one thread manages some functions and second thread manages the autobahn part).

With the schema that I proposed before, there is another problem, the need to send message, in a specific topic on the WAMP router, from the application part in the 'no autobahn thread', this functionality should be called through with a specific function without blocking the other functionalities.

I hope I have given all the details.

Thanks a lot for any response

--------------------------------EDIT---------------------------------

After some research I have implemented what I need for websocket protocol, the code is the follow:

# ----- twisted ----------
class _WebSocketClientProtocol(WebSocketClientProtocol):
    def __init__(self, factory):
        self.factory = factory

    def onOpen(self):
        #log.debug("Client connected")
        self.factory.protocol_instance = self
        self.factory.base_client._connected_event.set()

class _WebSocketClientFactory(WebSocketClientFactory):
    def __init__(self, *args, **kwargs):
        WebSocketClientFactory.__init__(self, *args, **kwargs)
        self.protocol_instance = None
        self.base_client = None

    def buildProtocol(self, addr):
        return _WebSocketClientProtocol(self)
# ------ end twisted -------
lass BaseWBClient(object):

    def __init__(self, websocket_settings):
        #self.settings = websocket_settings
        # instance to be set by the own factory
        self.factory = None
        # this event will be triggered on onOpen()
        self._connected_event = threading.Event()
        # queue to hold not yet dispatched messages
        self._send_queue = Queue.Queue()
        self._reactor_thread = None

    def connect(self):

        log.msg("Connecting to host:port")
        self.factory = _WebSocketClientFactory(
                                "ws://host:port",
                                debug=True)
        self.factory.base_client = self

        c = connectWS(self.factory)

        self._reactor_thread = threading.Thread(target=reactor.run,
                                               args=(False,))
        self._reactor_thread.daemon = True
        self._reactor_thread.start()

    def send_message(self, body):
        if not self._check_connection():
            return
        log.msg("Queing send")
        self._send_queue.put(body)
        reactor.callFromThread(self._dispatch)

    def _check_connection(self):
        if not self._connected_event.wait(timeout=10):
            log.err("Unable to connect to server")
            self.close()
            return False
        return True

    def _dispatch(self):
        log.msg("Dispatching")
        while True:
            try:
                body = self._send_queue.get(block=False)
            except Queue.Empty:
                break
            self.factory.protocol_instance.sendMessage(body)

    def close(self):
        reactor.callFromThread(reactor.stop)

import time
def Ppippo(coda):
        while True:
            coda.send_message('YOOOOOOOO')
            time.sleep(5)

if __name__ == '__main__':

    ws_setting = {'host':'', 'port':}

    client = BaseWBClient(ws_setting)

    t1 = threading.Thread(client.connect())
    t11 = threading.Thread(Ppippo(client))
    t11.start()
    t1.start()

The previous code work fine, but I need to translate this to operate on WAMP protocol insted websocket.

Does anyone know how I solve ?

Emile Cormier
  • 28,391
  • 15
  • 94
  • 122
alotronto
  • 99
  • 2
  • 8
  • Move `thread1.join()` down to be with `thread2.join()`. In its current location, it will tell the main thread to wait until the thread1 dies. Since you have no way of killing the thread (without killing the entire process with Ctrl-C), the second thread will never get created. – Aaron D Feb 09 '15 at 16:51
  • Also, your threads should do their work in the thread's `.run()` function. You `join()` a thread at the end of your main function to allow time for the threads to finish their execution before the main application quits. So you need to have a way for the threads to finish their task. – Aaron D Feb 09 '15 at 16:56
  • Do you need 2 app sessions, or really 2 threads? If the former, are both sessions to the same router/realm or different ones? If the latter, why do you need threads in the first place? If you need to do e.g. CPU intensive stuff and want to use multi-core, pls let us know. Need more "why" and "what" ... – oberstet Feb 09 '15 at 19:54
  • oberstet thanks for your response. I have modified the question with more details. – alotronto Feb 10 '15 at 11:12

1 Answers1

5

The bad news is that Autobahn is using the Twisted main loop, so you can't run it in two threads at once.

The good news is that you don't need to run it in two threads to run two things, and two threads would be more complicated anyway.

The API to get started with multiple applications is a bit confusing, because you have two ApplicationRunner objects, and it looks at first glance that the way you run an application in autobahn is to call ApplicationRunner.run.

However, ApplicationRunner is simply a convenience that wraps up the stuff that sets up the application and the stuff that runs the main loop; the real meat of the work happens in WampWebSocketClientFactory.

In order to achieve what you want, you just need to get rid of the threads, and run the main loop yourself, making the ApplicationRunner instances simply set up their applications.

In order to achieve this, you'll need to change the last part of your program to do this:

class ManageRemoteSystem:
    def __init__(self):
        self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm)

    def start(self):
        # Pass start_reactor=False to all runner.run() calls
        self.runner.run(AutobahnMRS, start_reactor=False)


class InternalMessages:
    def __init__(self):
        self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm)

    def start(self):
        # Same as above
        self.runner.run(AutobahnIM, start_reactor=False)


if __name__ == '__main__':
    server = ManageRemoteSystem()
    sendMessage = InternalMessages()
    server.start()
    sendMessage.start()

    from twisted.internet import reactor
    reactor.run()
Glyph
  • 31,152
  • 11
  • 87
  • 129
  • 1
    The API sucks a little, yep, in particular for this use case. We have unpublished stuff in another repo that allows to fire up multiple sessions with a single call that returns a `DeferredList` (which resolves with the individual WAMP app sessions.) Probably this should be in Autobahn .. – oberstet Feb 09 '15 at 19:55
  • Sorry, oberstet, can you illustrate this with more accurately – alotronto Feb 23 '15 at 15:30
  • 1
    Thanks @Glyph!!! I've been searching for ages for that `start_reactor` argument, but seems there is no mention of it in the docs... or how is otherwise one supposed to add autobahn to an existing Twisted application? – jjmontes Sep 10 '15 at 21:37
  • @jjmontes - I am pretty sure that's the only way. Bug oberstet for updates ;-). – Glyph Sep 11 '15 at 02:06