4

I'm trying to write a web crawler thing and want to make HTTP requests as quickly as possible. tornado's AsyncHttpClient seems like a good choice, but all the example code I've seen (e.g. https://stackoverflow.com/a/25549675/1650177) basically call AsyncHttpClient.fetch on a huge list of URLs to let tornado queue them up and eventually make the requests.

But what if I want to process an indefinitely long (or just a really big) list of URLs from a file or the network? I don't want to load all the URLs into memory.

Googled around but can't seem to find a way to AsyncHttpClient.fetch from an iterator. I did however find a way to do what I want using gevent: http://gevent.org/gevent.threadpool.html#gevent.threadpool.ThreadPool.imap. Is there a way to do something similar in tornado?

One solution I've thought of is to only queue up so many URLs initially then add logic to queue up more when a fetch operation completes but I'm hoping there's a cleaner solution.

Any help or recommendations would be appreciated!

Community
  • 1
  • 1
user1650177
  • 445
  • 3
  • 10
  • The example question you linked to doesn't load the entire list of urls into memory - it just reads one line a time from the file. Are you just worried about having too many http connections open at once? If so, I'm not sure how making the calls inside of a generator function would help you. Can you clarify exactly what you're looking for? – dano Aug 05 '15 at 15:30
  • The example question I linked to calls `fetch` on every URL in `urls.txt` one at a time, yes, but that's internally queueing a HTTP request for every URL in the list. I'm not worried about having too many HTTP connections open but rather having too many HTTP requests queued up. – user1650177 Aug 05 '15 at 15:35
  • Found what I'd like to do with tornado in gevent, if it helps clarifies things: http://gevent.org/gevent.threadpool.html#gevent.threadpool.ThreadPool.imap. I'd still like to know how you do something similar in tornado, though! – user1650177 Aug 05 '15 at 15:43
  • `AsyncHttpClient` is asynchronous - the requests aren't being queued, they're all being executed in parallel using non-blocking I/O. When a request completed, the `handle_request` callback gets executed. The only thing that would happen synchronously is the execution of the callbacks once the response is retrieved. – dano Aug 05 '15 at 15:53
  • Maybe I'm missing something here but I'm pretty sure they are. Straight from the docs (http://tornado.readthedocs.org/en/latest/httpclient.html#tornado.simple_httpclient.SimpleAsyncHTTPClient.initialize): `max_clients is the number of concurrent requests that can be in progress; when this limit is reached additional requests will be queued.` `max_clients` is set to 10 by default and I wouldn't like to set it to `len(list(iter))`. – user1650177 Aug 05 '15 at 16:03
  • Ah - I hadn't noticed that in the docs before. Interestingly, the built-in `multiprocessing.Pool.imap` doesn't do what you want here, since it immediately consumes you entire iterable and queues it internally, but the `gevent` version does behave the way you want. You would probably need to manually write something similar, maybe using [`tornado.locks.Semaphore`](http://tornado.readthedocs.org/en/latest/locks.html#semaphore). I might try it myself and add an answer if I get some free time later. – dano Aug 05 '15 at 16:39
  • Aw dang. Guess I'll try looking into that. Do share if you come up with something! – user1650177 Aug 05 '15 at 18:31

1 Answers1

4

I would do this with a Queue and multiple workers, in a variation on https://github.com/tornadoweb/tornado/blob/master/demos/webspider/webspider.py

import tornado.queues
from tornado import gen
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop

NUM_WORKERS = 10
QUEUE_SIZE = 100
q = tornado.queues.Queue(QUEUE_SIZE)
AsyncHTTPClient.configure(None, max_clients=NUM_WORKERS)
http_client = AsyncHTTPClient()

@gen.coroutine
def worker():
    while True:
        url = yield q.get()
        try:
            response = yield http_client.fetch(url)
            print('got response from', url)
        except Exception:
            print('failed to fetch', url)
        finally:
            q.task_done()

@gen.coroutine
def main():
    for i in range(NUM_WORKERS):
        IOLoop.current().spawn_callback(worker)
    with open("urls.txt") as f:
        for line in f:
            url = line.strip()
            # When the queue fills up, stop here to wait instead
            # of reading more from the file.
            yield q.put(url)
    yield q.join()

if __name__ == '__main__':
    IOLoop.current().run_sync(main)
Ben Darnell
  • 21,844
  • 3
  • 29
  • 50