3

I have an application which, for convenience (I am reusing existing code) has been split into two different threads:

  • one thread running the twisted reactor
  • another thread running an interactive menu

One of the things I want to perform from the interactive menu is to interact with the reactor. Once the user gives a specific command, I want to trigger a twisted event. Here is a very simplified version of my code:

from   twisted.spread                   import pb
from   twisted.internet                 import reactor
import threading

class TaskGatewaySupport():

    def __init__(self):
        self.object = None
        self.factory = pb.PBClientFactory()
        self.connector = None

    def gotObject(self, object):
        print 'gotObject > %s' % object
        self.object = object
        return object

    def gotData(self, data):
        return data

    def gotNoObject(self, reason):
        print 'gotNoObject > no object: %s' % reason

    def connect(self, task_gateway_host = '127.0.0.1', task_gateway_pb_port = 8889):
        print 'Connecting to %s:%s' % (task_gateway_host, task_gateway_pb_port)
        self.connector=reactor.connectTCP(task_gateway_host, task_gateway_pb_port, self.factory)
        d = self.factory.getRootObject()
        d.addCallbacks(self.gotObject, self.gotNoObject)
        return d

def Menu(task_gateway_support):
    while True:
        print '''

        A) Connect

        '''
        choice = raw_input('Option > ')
        if choice == 'A' : task_gateway_support.connect()
        else             : print "ERR: command not yet supported"

def version1():
    task_gateway_support  = TaskGatewaySupport()
    thread = threading.Thread(target = Menu, args = (task_gateway_support,))
    thread.start()
    reactor.run()

def version2():
    task_gateway_support  = TaskGatewaySupport()
    d = task_gateway_support.connect()
    reactor.run()

if __name__ == '__main__':
    version1()

As you can see, I am showing two different versions:

  • version1 is the one I want to run, but it does not
  • version2 has only one thread, and it is not interactive

Running version2 will give this result:

Connecting to 127.0.0.1:8889
gotObject > <twisted.spread.pb.RemoteReference instance at 0x88e734c>

Which is what I was expecting.

Running version1 will give this:

        A) Connect


Option > A
Connecting to 127.0.0.1:8889


        A) Connect


Option > ^CgotNoObject > no object: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectError'>: An error occurred while connecting: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion: Connection lost.
].
]

What I am doing here is selecting option A, and since nothing happens, I press ^C, which shows the error message.

I think the problems appear because I am sharing an object in two different threads, and I am trying to trigger the twisted events from the non-twisted thread. I was hoping that, since the object is shared, the reactor would be aware of anything hapening to the object.

So my main question is: how can I trigger twisted events from another thread?

blueFast
  • 41,341
  • 63
  • 198
  • 344

2 Answers2

2

You should avoid using threads for this. See User interaction in twisted process for information about how to accept user input in a single thread.

Apart from that, use reactor.callFromThread any time you want to call any Twisted API from a non-reactor thread.

Community
  • 1
  • 1
Jean-Paul Calderone
  • 47,755
  • 6
  • 94
  • 122
0

I actually ran into this problem myself with Twisted. Thankfully after a lot of googling I was able to come up with this answer, works quite well actually! -

def my_function(s):
    do_something_with_s

class GetCommands():
def start(self, callable):
    self.callable = callable
    self.startReceiving()

def startReceiving(self, s = ''):
    self.callable(s)
    if s != 'exit':
        threads.deferToThread(raw_input,' >>> ').addCallback(self.startReceiving)

Then in the main -

getCmds = GetCommands()
reactor.callWhenRunning(getCmds.start, my_function)

reactor.listenTCP(PORT, factory)
reactor.run()
Milean
  • 878
  • 9
  • 16
  • See Jean-Paul's answer for a less problematic way to get user input. This strategy has a problem: if `raw_input` is running in a thread, there's no safe way to stop it while the reactor is running; you have to wait for the user to enter a final command. – Glyph Jun 28 '13 at 23:18
  • You're right, I did run into that problem of needing the user to enter a final command before exiting. Annoying, but only a minor issue. I guess you could just abandon the thread with a timeout on the join method, but that would be a duct-tape fix. – Milean Jul 31 '13 at 21:07
  • Jean-Paul's answer uses the stdio that comes with Twisted. This is a work-around using raw_input() and doesn't work on Windows. So if you're only running this on UNIX systems that is a fine way to go. – Milean Jul 31 '13 at 21:10