I'm having trouble using twisted together with doctest
. I'm trying to open a file like so:
from __future__ import print_function
from twisted.internet.defer import Deferred
from twisted.internet.fdesc import readFromFD, setNonBlocking
from twisted.internet.task import react
def asyncFunc(filename):
"""Description.
Examples:
>>> def run(reactor, filename):
... d = asyncFunc(filename)
... return d.addCallback(print)
>>>
>>> try:
... react(run, ['hello.txt'])
... except SystemExit:
... pass
hello world
<BLANKLINE>
"""
with open(filename) as f:
d = Deferred()
fd = f.fileno()
setNonBlocking(fd)
readFromFD(fd, d.callback)
return d
It works fine with just one test, but for more than one I get a ReactorNotRestartable
error.
def anotherAsyncFunc(filename):
"""Description.
Examples:
>>> def run(reactor, filename):
... d = anotherAsyncFunc(filename)
... return d.addCallback(print)
>>>
>>> try:
... react(run, ['hello.txt'])
... except SystemExit:
... pass
hello world
<BLANKLINE>
"""
with open(filename) as f:
d = Deferred()
fd = f.fileno()
setNonBlocking(fd)
readFromFD(fd, d.callback)
return d
I then read about MemoryReactor
and tried this:
def anotherAsyncFunc(filename):
"""Description.
Examples:
>>> from twisted.test.proto_helpers import MemoryReactor
>>> reactor = MemoryReactor()
>>>
>>> def run(reactor, filename):
... d = anotherAsyncFunc(filename)
... return d.addCallback(print)
>>>
>>> try:
... react(run, ['hello.txt'], reactor)
... except SystemExit:
... pass
hello world
<BLANKLINE>
"""
with open(filename) as f:
d = Deferred()
fd = f.fileno()
setNonBlocking(fd)
readFromFD(fd, d.callback)
return d
But that gives me an 'MemoryReactor' object has no attribute 'addSystemEventTrigger'
AttributeError
. Any idea on the proper way to do this?