5

I am writing a web crawler that is running parallel fetches for many different domains. I want to limit the number of requests-per-second that are made to each individual domain, but I do not care about the total number of connections that are open, or the total requests per second that are made across all domains. I want to maximize the number of open connections and requests-per-second overall, while limiting the number of requests-per-second made to individual domains.

All of the currently existing examples I can find either (1) limit the number of open connections or (2) limit the total number of requests-per-second made in the fetch loop. Examples include:

Neither of them do what I am requesting which is to limit requests-per-second on a per domain basis. The first question only answers how to limit requests-per-second overall. The second one doesn't even have answers to the actual question (the OP asks about requests per second and the answers all talk about limiting # of connections).

Here is the code that I tried, using a simple rate limiter I made for a synchronous version, which doesn't work when the DomainTimer code is run in an async event loop:

from collections import defaultdict
from datetime import datetime, timedelta
import asyncio
import async_timeout
import aiohttp
from urllib.parse import urlparse
from queue import Queue, Empty

from HTMLProcessing import processHTML
import URLFilters

SEED_URLS = ['http://www.bbc.co.uk', 'http://www.news.google.com']
url_queue = Queue()
for u in SEED_URLS:
    url_queue.put(u)

# number of pages to download per run of crawlConcurrent()
BATCH_SIZE = 100
DELAY = timedelta(seconds = 1.0) # delay between requests from single domain, in seconds

HTTP_HEADERS = {'Referer': 'http://www.google.com', 
                'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:59.0) Gecko/20100101 Firefox/59.0'}


class DomainTimer():
    def __init__(self):
        self.timer = None

    def resetTimer(self):
        self.timer = datetime.now()

    def delayExceeded(self, delay):
        if not self.timer: #We haven't fetched this before
            return True
        if (datetime.now() - self.timer) >= delay:
            return True
        else:
            return False


crawl_history = defaultdict(dict) # given a URL, when is last time crawled?
domain_timers = defaultdict(DomainTimer)

async def fetch(session, url):
    domain = urlparse(url).netloc
    print('here fetching ' + url + "\n")
    dt = domain_timers[domain]

    if dt.delayExceeded(DELAY) or not dt:
        with async_timeout.timeout(10):
            try:
                dt.resetTimer() # reset domain timer
                async with session.get(url, headers=HTTP_HEADERS) as response:
                    if response.status == 200:
                        crawl_history[url] = datetime.now()
                        html = await response.text()
                        return {'url': url, 'html': html}
                    else:
                        # log HTTP response, put into crawl_history so
                        # we don't attempt to fetch again
                        print(url + " failed with response: " + str(response.status) + "\n")
                        return {'url': url, 'http_status': response.status}

            except aiohttp.ClientConnectionError as e:
                print("Connection failed " + str(e))

            except aiohttp.ClientPayloadError as e: 
                print("Recieved bad data from server @ " + url + "\n")

    else: # Delay hasn't passed yet: skip for now & put @ end of q
        url_queue.put(url);
        return None


async def fetch_all(urls):
    """Launch requests for all web pages."""
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(session, url))
            tasks.append(task) # create list of tasks
        return await asyncio.gather(*tasks) # gather task responses


def batch_crawl():
    """Launch requests for all web pages."""
    start_time = datetime.now()

    # Here we build the list of URLs to crawl for this batch
    urls = []
    for i in range(BATCH_SIZE):
        try:
            next_url = url_queue.get_nowait() # get next URL from queue
            urls.append(next_url)
        except Empty:
            print("Processed all items in URL queue.\n")
            break;

    loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)  
    pages = loop.run_until_complete(fetch_all(urls))
    crawl_time = (datetime.now() - start_time).seconds
    print("Crawl completed. Fetched " + str(len(pages)) + " pages in " + str(crawl_time) + " seconds.\n")  
    return pages


def parse_html(pages):
    """ Parse the HTML for each page downloaded in this batch"""
    start_time = datetime.now()
    results = {}

    for p in pages:
        if not p or not p['html']:
            print("Received empty page")
            continue
        else:
            url, html = p['url'], p['html']
            results[url] = processHTML(html)

    processing_time = (datetime.now() - start_time).seconds
    print("HTML processing finished. Processed " + str(len(results)) + " pages in " + str(processing_time) + " seconds.\n")  
    return results


def extract_new_links(results):
    """Extract links from """
    # later we could track where links were from here, anchor text, etc, 
    # and weight queue priority  based on that
    links = []
    for k in results.keys():
        new_urls = [l['href'] for l in results[k]['links']]
        for u in new_urls:
            if u not in crawl_history.keys():
                links.append(u)
    return links

def filterURLs(urls):
    urls = URLFilters.filterDuplicates(urls)
    urls = URLFilters.filterBlacklistedDomains(urls)
    return urls

def run_batch():
    pages = batch_crawl()
    results = parse_html(pages)
    links = extract_new_links(results)
    for l in filterURLs(links):
        url_queue.put(l)

    return results

There are no errors or exceptions thrown, and the rate-limiting code works fine in for synchronous fetches, but the DomainTimer has no apparent effect when run in async loop. The delay of one request-per-second per domain is not upheld...

How would I modify this synchronous rate limiting code to work within the async event loop? Thanks!

J. Taylor
  • 4,567
  • 3
  • 35
  • 55
  • What do you mean by “doesn’t work”? Are you getting an exception? If so, what’s the traceback? – dirn Apr 07 '18 at 15:05
  • I just mean that it doesn't actually limit the rate. No errors. The code works fine synchronously. It just has no (apparent) effect when in the async code. I modified the question to clarify this. Thanks. – J. Taylor Apr 07 '18 at 15:22
  • 1
    The call to `dt.resetTimer()` looks incorrect. It will reset the timer each time a 200 response is obtained, thus defeating the rate limiting as long as you get 200 responses. Is that intended? – user4815162342 Apr 07 '18 at 15:58
  • Oh, thanks. That wasn't intended. I'll fix it and see if that's all that was wrong. But based on how quickly the requests were completing, I think there is something else going on too. – J. Taylor Apr 07 '18 at 16:13
  • @user48151622342 - I fixed the bug you pointed out, but the original issue still occurs: in a url_queue with 50+ urls from the same site, all of them are requested within ~3 seconds. So the delay is still being ignored. Any ideas why this won't work? – J. Taylor Apr 07 '18 at 17:54
  • What steps have you taken to debug the issue? I would try adding some `print`s or debugger breakpoints to see what's going wrong. Maybe the `netloc` somehow ends up being different for URLs from the same domain, etc. – user4815162342 Apr 08 '18 at 07:06

2 Answers2

6

It's hard to debug your code since it contains many unrelated stuff, it's easier to show idea on a new simple example.

Main idea:

  • write your Semaphore-like class using __aenter__, __aexit__ that accepts url (domain)
  • use domain-specific Lock to prevent multiple requests to the same domain
  • sleep before allowing next request according to domain's last request and RPS
  • track time of last request for each domain

Code:

import asyncio
import aiohttp
from urllib.parse import urlparse
from collections import defaultdict


class Limiter:
    # domain -> req/sec:
    _limits = {
        'httpbin.org': 4,
        'eu.httpbin.org': 1,
    }

    # domain -> it's lock:
    _locks = defaultdict(lambda: asyncio.Lock())

    # domain -> it's last request time
    _times = defaultdict(lambda: 0)

    def __init__(self, url):
        self._host = urlparse(url).hostname

    async def __aenter__(self):
        await self._lock

        to_wait = self._to_wait_before_request()
        print(f'Wait {to_wait} sec before next request to {self._host}')
        await asyncio.sleep(to_wait)

    async def __aexit__(self, *args):        
        print(f'Request to {self._host} just finished')

        self._update_request_time()
        self._lock.release()

    @property
    def _lock(self):
        """Lock that prevents multiple requests to same host."""
        return self._locks[self._host]

    def _to_wait_before_request(self):
        """What time we need to wait before request to host."""
        request_time = self._times[self._host]
        request_delay = 1 / self._limits[self._host]
        now = asyncio.get_event_loop().time()
        to_wait = request_time + request_delay - now
        to_wait = max(0, to_wait)
        return to_wait

    def _update_request_time(self):
        now = asyncio.get_event_loop().time()
        self._times[self._host] = now


# request that uses Limiter instead of Semaphore:
async def get(url):
    async with Limiter(url):
        async with aiohttp.ClientSession() as session:  # TODO reuse session for different requests.
            async with session.get(url) as resp:
                return await resp.text()


# main:
async def main():
    coros = [
        get('http://httpbin.org/get'),
        get('http://httpbin.org/get'),
        get('http://httpbin.org/get'),
        get('http://httpbin.org/get'),
        get('http://httpbin.org/get'),
        get('http://eu.httpbin.org/get'),
        get('http://eu.httpbin.org/get'),
        get('http://eu.httpbin.org/get'),
        get('http://eu.httpbin.org/get'),
        get('http://eu.httpbin.org/get'),
    ]

    await asyncio.gather(*coros)


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • Thanks - this is very helpful, and is actually enforcing the delays. But now the problem is that it's almost as slow as synchronous fetch because it sleeps whenever it encounters the same domain multiple times in the queue before the delay has passed. I need it to skip these domains and push them to the end of the queue instead, as I tried to do in my code. But I can't figure out how to do that with this, because it is in the "with" statement, whereas before I had a conditional to skip fetches until delay had passed (if delayExceeded ... else, url_queue.put(u)). How would I do this? – J. Taylor Apr 08 '18 at 20:01
  • So if I am fetching 100 urls per batch, and my queue has 2000 urls and the first 100 are from 'http://www.bbc.co.uk', I want it to fetch the first bbc.co.uk url, and then push the others it encounters to the end of the queue until the delay has passed, fetching pages from other domains in the meantime. That is, I never want to be waiting to download from a given domain. I just want it to push to end of the queue and fetch something else. I don't care about fetching in the order they were put in the queue. – J. Taylor Apr 08 '18 at 20:04
  • @J.Taylor why do you want to run process by separate batches? What stops you from running fetching process for all urls and limiting simultaneous amount using `Semaphore`? It'll be more efficient than waiting for one batch before starting another. – Mikhail Gerasimov Apr 08 '18 at 20:47
  • Because in between batches I am running url/domain filters on the queue to remove urls (spam, blacklisted sites, duplicates, etc), and then running it through another set of functions to prioritize and resort the queue as it grows beyond the batch size. I am converting the queue to a list between batches, filtering/resorting it, and then creating a new queue from the list. – J. Taylor Apr 08 '18 at 21:14
  • The batch size is set very low right now for testing, by the way - I would normally be downloading something more on the order of 10,000 - 50,000 at a time, and then processing all of it at once in a process pool. Anyways, I think your code is pretty close to what I need, and I'm going to try to modify it to push to queue and grab next url instead of sleeping, and see if that works. – J. Taylor Apr 08 '18 at 21:57
0

I developed a library named octopus-api (https://pypi.org/project/octopus-api/), that enables you to rate limit and set the number of connections to the endpoint using aiohttp under the hood. The goal of it is to simplify all the aiohttp setup needed.

Here is an example of how to use it, where the get_ethereum is the user-defined request function. It could have also been a web crawler function request or what ever fits:

from octopus_api import TentacleSession, OctopusApi
from typing import Dict, List

if __name__ == '__main__':
    async def get_ethereum(session: TentacleSession, request: Dict):
        async with session.get(url=request["url"], params=request["params"]) as response:
            body = await response.json()
            return body

    client = OctopusApi(rate=50, resolution="sec", connections=6)
    result: List = client.execute(requests_list=[{
        "url": "https://api.pro.coinbase.com/products/ETH-EUR/candles?granularity=900&start=2021-12-04T00:00:00Z&end=2021-12-04T00:00:00Z",
        "params": {}}] * 1000, func=get_ethereum)
    print(result)

The TentacleSession works the same as how you write POST, GET, PUT and PATCH for aiohttp.ClientSession.

Let me know if it helps your issue related to rate limits and connection for crawling.

Filip
  • 21
  • 2