1

I previously asked how to run several autobahn.ApplicationSession instances from within the same python process, without having them block.

The issue was resolved, however I've encountered a new issue.

Terminating these mp.Process instances proves difficult. I know the code in ApplicationRunner.run() exits upon a KeyboardInterrupt, but I've been unable to get this to trigger correctly.

Sample Code:

class PoloniexSession(ApplicationSession):

    @coroutine
    def onJoin(self, *args, **kwargs):
        channel = self.config.extra['channel']

        def onTicker(*args, **kwargs):
            self.config.extra['queue'].put((channel, (args, kwargs, time.time())))

        try:
            yield from self.subscribe(onTicker, self.config.extra['channel'])

        except Exception as e:
            raise


class PlnxEndpoint(mp.Process):
    def __init__(self, endpoint, q, **kwargs):
        super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' %
                                                endpoint, **kwargs)
        self.endpoint = endpoint
        self.q = q

    def run(self):
        self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1',
                                   extra={'channel': self.endpoint, 
                                          'queue': self.q})
        self.runner.run(PoloniexSession)

    def join(self, *args, **kwargs):
        def sig_handler(x, y):
            pass
        signal.signal(signal.SIGINT, sig_handler)
        super(PlnxEndpoint, self).join(*args, **kwargs)


class PoloniexWSS(WSSAPI):
    def __init__(self, endpoints=None):
        super(PoloniexWSS, self).__init__(None, 'Poloniex')
        self.data_q = mp.Queue()
        self.connections = {}
        if endpoints:
            self.endpoints = endpoints
        else:
            r = requests.get('https://poloniex.com/public?command=returnTicker')
            self.endpoints = list(r.json().keys())
            self.endpoints.append('ticker')

        for endpoint in self.endpoints:
            self.connections[endpoint] = PlnxEndpoint(endpoint, self.data_q)

    def start(self):
        super(PoloniexWSS, self).start()
        for conn in self.connections:
            self.connections[conn].start()

    def stop(self):
        for conn in self.connections:
            self.connections[conn].join()
        super(PoloniexWSS, self).stop()

While this fills self.q adequately, I still receive an error when my subprocesses are stopped:

RuntimeError: Event loop stopped before Future completed.
    Traceback (most recent call last):
  File "/home/nils/anaconda3/lib/python3.5/multiprocessing    /process.py", line 254, in _bootstrap
    self.run()
  File "/home/nils/git/tools/bitexwss/bitexws//api/poloniex.py", line 46, in run
    self.runner.run(PoloniexSession)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 172, in run
    loop.run_until_complete(protocol._session.leave())
  File "/home/nils/anaconda3/lib/python3.5/asyncio/base_events.py", line 335, in run_until_complete
    raise RuntimeError('Event loop stopped before Future completed.')

Which leads me to believe my signal.SIGINT isn't triggered where I want it to.

According to the source code of ApplicationRunner.run(), a SIGINT / KeyboardInterrupt should gracefully result in ending the serve_forever() method.

Manually closing the asyncio.event_loop results in the above error as well:

class PlnxEndpoint(mp.Process):
#...
    def join(self, *args, **kwargs):
        loop = get_event_loop()
        loop.stop()
        super(PlnxEndpoint, self).join(*args, **kwargs)
#...
Community
  • 1
  • 1
deepbrook
  • 2,523
  • 4
  • 28
  • 49
  • We should handle this in the previous Question, I suggest to close this Question. – stovfl Mar 17 '17 at 06:41
  • hm, I disagree. It's a separate problem, afterall. `Running 2 ApplicationRunner().run() methods in separate processes` vs `Closing a ApplicationRunner in a separat process`. They are undoubtedly related, but still two different problems. – deepbrook Mar 17 '17 at 07:05

1 Answers1

0

Afterall, a little bit of fiddling around yielded the rather simple solution:

Using multiprocessing.Event(), I was able to gracefully end my process.

class PoloniexSession(ApplicationSession):

    @coroutine
    def onJoin(self, *args, **kwargs):
        channel = self.config.extra['channel']

        def onTicker(*args, **kwargs):
            self.config.extra['queue'].put((channel, (args, kwargs, time.time())))

        if self.config.extra['is_killed'].is_set():
            raise KeyboardInterrupt()
        try:
            yield from self.subscribe(onTicker, self.config.extra['channel'])

        except Exception as e:
            raise


class PlnxEndpoint(mp.Process):
    def __init__(self, endpoint, q, **kwargs):
        super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' %
                                                endpoint, **kwargs)
        self.endpoint = endpoint
        self.q = q
        self.is_killed = mp.Event()

    def run(self):
        self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1',
                                   extra={'channel': self.endpoint,
                                          'queue': self.q,
                                          'is_killed': self.is_killed})
        self.runner.run(PoloniexSession)

    def join(self, *args, **kwargs):
        self.is_killed.set()
        super(PlnxEndpoint, self).join(*args, **kwargs)
deepbrook
  • 2,523
  • 4
  • 28
  • 49
  • The `def join()` now are back to usage as recommended. But still got no picture why you intend to stop the `process` within `join()`. – stovfl Mar 17 '17 at 13:54