In my code, I have two hypothetical tasks: one gets urls from a generator and batch downloads them using Twisted's Cooperator, and the other takes a downloaded source and asynchronously parses it. I'm trying to encapsulate all of the fetch and parse tasks into a single Deferred object that calls back when all pages are downloaded and all sources are parsed.
I've come up with the following solution:
from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage
BATCH_SIZE = 5
def main_task():
result = defer.Deferred()
state = {'count': 0, 'done': False}
def on_parse_finish(r):
state['count'] -= 1
if state['done'] and state['count'] == 0:
result.callback(True)
def process(source):
deferred = parse(source)
state['count'] += 1
deferred.addCallback(on_parse_finish)
def fetch_urls():
for url in get_urls():
deferred = getPage(url)
deferred.addCallback(process)
yield deferred
def on_finish(r):
state['done'] = True
deferreds = []
coop = task.Cooperator()
urls = fetch_urls()
for _ in xrange(BATCH_SIZE):
deferreds.append(coop.coiterate(urls))
main_tasks = defer.DeferredList(deferreds)
main_tasks.addCallback(on_finish)
return defer.DeferredList([main_tasks, result])
# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)
The code works, but I feel as if I'm either missing something blatantly obvious, or ignorant of a simple Twisted pattern that would make this a lot simpler. Is there a better way to return a single Deferred that calls back when all fetching and parsing is finished?