6

In my code, I have two hypothetical tasks: one gets urls from a generator and batch downloads them using Twisted's Cooperator, and the other takes a downloaded source and asynchronously parses it. I'm trying to encapsulate all of the fetch and parse tasks into a single Deferred object that calls back when all pages are downloaded and all sources are parsed.

I've come up with the following solution:

from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage


BATCH_SIZE = 5

def main_task():
    result = defer.Deferred()
    state = {'count': 0, 'done': False}

    def on_parse_finish(r):
        state['count'] -= 1
        if state['done'] and state['count'] == 0:
            result.callback(True)

    def process(source):
        deferred = parse(source)
        state['count'] += 1
        deferred.addCallback(on_parse_finish)

    def fetch_urls():
        for url in get_urls():
            deferred = getPage(url)
            deferred.addCallback(process)
            yield deferred

    def on_finish(r):
        state['done'] = True

    deferreds = []

    coop = task.Cooperator()
    urls = fetch_urls()
    for _ in xrange(BATCH_SIZE):
        deferreds.append(coop.coiterate(urls))

    main_tasks = defer.DeferredList(deferreds)
    main_tasks.addCallback(on_finish)

    return defer.DeferredList([main_tasks, result])

# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)

The code works, but I feel as if I'm either missing something blatantly obvious, or ignorant of a simple Twisted pattern that would make this a lot simpler. Is there a better way to return a single Deferred that calls back when all fetching and parsing is finished?

enderskill
  • 7,354
  • 3
  • 24
  • 23
  • `Undefined name 'parse'`, `Undefined name 'get_urls'`, `Undefined name 'task_finished'`. It's a lot easier to make sure that answers to a question are correct if the sample code in the question actually runs :). – Glyph Dec 04 '13 at 12:19

1 Answers1

9

As currently written, it looks to me like this code will have a limited number of parallel downloads, but an unlimited number of parallel parse jobs. Is that intentional? I'm going to assume "no", since if your network happens to be fast and your parser happens to be slow, as the number of URLs approaches infinity, so does your memory usage :).

So here's a thing that will have limited parallelism but carry out parses sequentially with downloads, instead:

from twisted.internet import defer, task
from twisted.web.client import getPage

BATCH_SIZE = 5

def main_task(reactor):
    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parse)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(BATCH_SIZE)])
            .addCallback(task_finished))

task.react(main_task)

This works because since parse (apparently) returns a Deferred, adding it as a callback to the one returned by getPage results in a Deferred that won't call the callback added by coiterate until parse has done its business.

Since you were asking about idiomatic Twisted code, I've also taken the liberty of modernizing it a bit (using task.react rather than running the reactor manually, inlining expressions to make things briefer and so on).

If you really do want to have more parallel parses than parallel fetches, something like this might work better then:

from twisted.internet import defer, task
from twisted.web.client import getPage

PARALLEL_FETCHES = 5
PARALLEL_PARSES = 10

def main_task(reactor):
    parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)

    def parseWhenReady(r):
        def parallelParse(_):
            parse(r).addBoth(
                lambda result: parseSemaphore.release().addCallback(
                    lambda _: result
                )
            )
        return parseSemaphore.acquire().addCallback(parallelParse)

    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parseWhenReady)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(PARALLEL_FETCHES)])
            .addCallback(lambda done:
                         defer.DeferredList(
                            [parseSemaphore.acquire()
                             for _ in xrange(PARALLEL_PARSES)]
                         ))
            .addCallback(task_finished))

task.react(main_task)

You can see that parseWhenReady returns the Deferred returned from acquire, so parallel fetching will continue as soon as the parallel parse can begin, and therefore you won't continue fetching indiscriminately even when the parser is overloaded. However, parallelParse carefully abstains from returning the Deferred returned by parse or release, since fetching should be able continue as those are proceeding.

(Please note that since your initial example was not runnable, I haven't tested either of these at all. Hopefully the intent is clear even if there are bugs.)

Glyph
  • 31,152
  • 11
  • 87
  • 129
  • I'm sorry about the code not being able to run and my lack of clarity in the question. Because the question is purely theoretical, I'm fine with unlimited concurrency for `parse` for now. Since `main_task` is going to be run with `blockingCallFromThread`, it needs to return a `Deferred` that calls back when all pages are downloaded and all sources are parsed without using the hackish state variable. I edited the question to include this. – enderskill Dec 06 '13 at 05:37
  • 2
    If you specifically want to remove the limitation on concurrency of `parse`, just use the first example but with `yield getPage(url). addCallback(lambda x: parse(x) and None)` instead of `yield getPage(url).addCallback(parse)`, so that the `Deferred` from `parse` is explicitly discarded instead of chained. Does this answer then address your use-case? – Glyph Dec 08 '13 at 09:33
  • This answers the question. Also, thanks for your work on Twisted. It's the only reason I use Python for network programming. :] – enderskill Dec 08 '13 at 22:52
  • +1 for showing a simple example how `DeferredSemaphore` can be useful. Is it correct that if enough `parse()`s take too long then *all url fetches stop* i.e., everything hangs on `parseSemaphore.acquire()` waiting for at least one `parse()` to complete? – jfs Dec 12 '13 at 22:32