6

I want to download and process a lot of files from website. The terms of service for the site restrict the number of files you're permitted to download per second.

The time that it takes to process the files is actually the bottle neck, so I'd like to be able process multiple files in parallel. But I don't want the different processes to combine to violate the download limit. So I need something that limits the over request rate. I was thinking something like the following, but I'm not exactly an expert with the multiprocessing module.

import multiprocessing
from multiprocessing.managers import BaseManager
import time

class DownloadLimiter(object):

    def __init__(self, time):
        self.time = time
        self.lock = multiprocessing.Lock()

    def get(self, url):
        self.lock.acquire()
        time.sleep(self.time)
        self.lock.release()
        return url


class DownloadManager(BaseManager):
    pass

DownloadManager.register('downloader', DownloadLimiter)


class Worker(multiprocessing.Process):

    def __init__(self, downloader, queue, file_name):
        super().__init__()
        self.downloader = downloader
        self.file_name = file_name
        self.queue = queue

    def run(self):
        while not self.queue.empty():
            url = self.queue.get()
            content = self.downloader.get(url)
            with open(self.file_name, "a+") as fh:
                fh.write(str(content) + "\n")

Then somewhere else running the downloads with

manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()

urls = range(50)
for url in urls:
    queue.put(url)

job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]

for job in jobs:
    job.start()

for job in jobs:
    job.join()

This seems to do the job on a small scale, but I'm a little wary about whether the locking is really being done correctly.

Also, if there's a better pattern for achieving the same goal, I'd love to hear it.

Batman
  • 8,571
  • 7
  • 41
  • 80
  • What do you use to download files? If you use `requests` here is a ready-to-use library solving this problem: https://github.com/SerpentAI/requests-respectful – Andrey Semakin Mar 24 '19 at 17:26
  • By "process the files" you really mean more than just writing them to disk like in your example? – Darkonaut Mar 25 '19 at 02:17
  • @Darkonaut Exactly. – Batman Mar 25 '19 at 02:44
  • 1
    I see people interpret your question differently. It's unclear what your rate limit is about exactly. Limiting the "download per second." which I interpreted as "download-starts within a second" or just the number of _concurrent_ downloads at any time, in which case "per second" would be just misleading. – Darkonaut Mar 26 '19 at 15:46
  • By "downloads per second" I mean that globally there are no more than downloads started per second. – Batman Mar 28 '19 at 14:44
  • But that's what my solution provided. I deleted it after I came to think I must have misunderstood the question because I got no feedback from you for 20 hours and @Robert Nishihara's interpretation of the rate limit sounded more reasonable for downloads. Please care more about your questions, it can take hours to work out an answer in the multiprocessing-tag, at least people deserve to know if they provided an answer which is on target. – Darkonaut Mar 28 '19 at 16:27

5 Answers5

1

This can be done cleanly with Ray, which is a library for parallel and distributed Python.

Resources in Ray

When you start Ray, you can tell it what resources are available on that machine. Ray will automatically attempt to determine the number of CPU cores, and the number of GPUs, but these can be specified and in fact arbitrary user-defined resources can be passed in as well, e.g., by calling

ray.init(num_cpus=4, resources={'Network': 2})

This tells Ray that the machine has 4 CPU cores and 2 of a user-defined resource called Network.

Each Ray "task", which is a schedulable unit of work, has certain resource requirements. By default, a task will require 1 CPU core and nothing else. However, arbitrary resource requirements can be specified by declaring the corresponding function with

@ray.remote(resources={'Network': 1})
def f():
    pass

This tells Ray that in order for f to execute on a "worker" process, there must be 1 CPU core (the default value) and 1 Network resource available.

Since the machine has 2 of the Network resource and 4 CPU cores, at most 2 copies of f can be executed concurrently. On the other hand, if there is another function g declared with

@ray.remote
def g():
    pass

then four copies of g can be executed concurrently or two copies of f and two copies of g can be executed concurrently.

Example

Here's an example, with placeholders for the actual functions used to download the content and process the content.

import ray
import time

max_concurrent_downloads = 2

ray.init(num_cpus=4, resources={'Network': max_concurrent_downloads})

@ray.remote(resources={'Network': 1})
def download_content(url):
    # Download the file.
    time.sleep(1)
    return 'result from ' + url

@ray.remote
def process_result(result):
    # Process the result.
    time.sleep(1)
    return 'processed ' + result

urls = ['url1', 'url2', 'url3', 'url4']

result_ids = [download_content.remote(url) for url in urls]

processed_ids = [process_result.remote(result_id) for result_id in result_ids]

# Wait until the tasks have finished and retrieve the results.
processed_results = ray.get(processed_ids)

Here's a timeline depiction (which you can produce by running ray timeline from the command line and opening the resulting JSON file in chrome://tracing in the Chrome web browser).

In the above script, we submit 4 download_content tasks. Those are the ones that we rate limit by specifying that they require the Network resource (in addition to the default 1 CPU resource). Then we submit 4 process_result tasks, which each require the default 1 CPU resource. The tasks are executed in three stages (just look at the blue boxes).

  1. We start by executing 2 download_content tasks, which is as many as can execute at one time (because of the rate limiting). We can't execute any of the process_result tasks yet because they depend on the output of the download_content tasks.
  2. Those finish, so we start executing the remaining two download_content tasks as well as two process_result tasks, because we are not rate limiting the process_result tasks.
  3. We execute the remaining process_result tasks.

Each "row" is one worker process. Time goes from left to right.

enter image description here

You can check out more about how to do this at the Ray documentation.

Robert Nishihara
  • 3,276
  • 16
  • 17
1

OK, after the following clarification from the OP

By "downloads per second" I mean that globally there are no more than downloads started per second.

I decided to post another answer as I think my first one could be also of interest for somebody looking to limit a number of concurrent running processes.

I think there is no need to use additional frameworks to solve this problem. The idea is to use downloading threads which are spawn for each resource link, a resource queue, and a fixed number of processing workers which are processes, not threads:

#!/usr/bin/env python3
import os
import time
import random
from threading import Thread
from multiprocessing import Process, JoinableQueue


WORKERS = 4
DOWNLOADS_PER_SECOND = 2


def download_resource(url, resource_queue):
    pid = os.getpid()

    t = time.strftime('%H:%M:%S')
    print('Thread {p} is downloading from {u} ({t})'.format(p=pid, u=url, t=t),
          flush=True)
    time.sleep(random.randint(1, 10))

    results = '[resource {}]'.format(url)
    resource_queue.put(results)


def process_resource(resource_queue):
    pid = os.getpid()

    while True:
        res = resource_queue.get()

        print('Process {p} is processing {r}'.format(p=pid, r=res),
              flush=True)
        time.sleep(random.randint(1, 10))

        resource_queue.task_done()


def main():
    resource_queue = JoinableQueue()

    # Start process workers:
    for _ in range(WORKERS):
        worker = Process(target=process_resource,
                         args=(resource_queue,),
                         daemon=True)
        worker.start()

    urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]

    while urls:
        target_urls = urls[:DOWNLOADS_PER_SECOND]
        urls = urls[DOWNLOADS_PER_SECOND:]

        # Start downloader threads:
        for url in target_urls:
            downloader = Thread(target=download_resource,
                                args=(url, resource_queue),
                                daemon=True)
            downloader.start()

        time.sleep(1)

    resource_queue.join()


if __name__ == '__main__':
    main()

The results look something like this:

$ ./limit_download_rate.py
Thread 32482 is downloading from https://link/to/resource/0 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/1 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/2 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/3 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/4 (10:14:10)
Thread 32482 is downloading from https://link/to/resource/5 (10:14:10)
Process 32483 is processing [resource https://link/to/resource/2]
Process 32484 is processing [resource https://link/to/resource/0]
Thread 32482 is downloading from https://link/to/resource/6 (10:14:11)
Thread 32482 is downloading from https://link/to/resource/7 (10:14:11)
Process 32485 is processing [resource https://link/to/resource/1]
Process 32486 is processing [resource https://link/to/resource/3]
Thread 32482 is downloading from https://link/to/resource/8 (10:14:12)
Thread 32482 is downloading from https://link/to/resource/9 (10:14:12)
Process 32484 is processing [resource https://link/to/resource/6]
Process 32485 is processing [resource https://link/to/resource/9]
Process 32483 is processing [resource https://link/to/resource/8]
Process 32486 is processing [resource https://link/to/resource/4]
Process 32485 is processing [resource https://link/to/resource/7]
Process 32483 is processing [resource https://link/to/resource/5]

Here, every second DOWNLOADS_PER_SECOND threads are starting, two in this example, which then download and put resources into the queue. The WORKERS is a number of processes which get resources from the queue for further processing. With this setup, you'll be able to limit the number of downloads started per second and have the workers processing obtained resources in parallel.

constt
  • 2,250
  • 1
  • 17
  • 18
0

There is a library exactly for your needs, called ratelimit

Example from their homepage:

This function will not be able to make more then 15 API call within a 15 minute time period.

from ratelimit import limits

import requests

FIFTEEN_MINUTES = 900

@limits(calls=15, period=FIFTEEN_MINUTES)
def call_api(url):
    response = requests.get(url)

    if response.status_code != 200:
        raise Exception('API response: {}'.format(response.status_code))
    return response

By the way, in I/O intensive tasks (such as web crawling) you can use multithreading, instead of multiprocessing. While using multiprocessing, you have to create another process for control, and orchestrate all the stuff you do. In the case of multithreaded approach, all the threads inherently will have access to main process memory, so signaling becomes much easier (as e is shared between the threads):

import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    event_is_set = e.wait()
    logging.debug('event set: %s', event_is_set)

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.isSet():
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')


e = threading.Event()
t1 = threading.Thread(name='block', 
                      target=wait_for_event,
                      args=(e,))
t1.start()

t2 = threading.Thread(name='non-block', 
                      target=wait_for_event_timeout, 
                      args=(e, 2))
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(3)
e.set()
logging.debug('Event is set')
igrinis
  • 12,398
  • 20
  • 45
0

The simplest approach is to download on the main thread and feed documents to the worker pool.

In my own implementations of this I've gone the route of using celery for processing documents and using gevent for downloads. Which does the same thing just with more complexity.

Here's a simple example.

import multiprocessing
from multiprocessing import Pool
import time
import typing

def work(doc: str) -> str:
    # do some processing here....
    return doc + " processed"

def download(url: str) -> str:
    return url  # a hack for demo, use e.g. `requests.get()`

def run_pipeline(
    urls: typing.List[str],
    session_request_limit: int = 10,
    session_length: int = 60,
) -> None:
    """
    Download and process each url in `urls` at a max. rate limit
    given by `session_request_limit / session_length`
    """
    workers = Pool(multiprocessing.cpu_count())
    results = []

    n_requests = 0
    session_start = time.time()

    for url in urls:
        doc = download(url)
        results.append(
            workers.apply_async(work, (doc,))
        )
        n_requests += 1

        if n_requests >= session_request_limit:
            time_to_next_session = session_length - time.time() - session_start
            time.sleep(time_to_next_session)

        if time.time() - session_start >= session_length:
            session_start = time.time()
            n_requests = 0

    # Collect results
    for result in results:
        print(result.get())

if __name__ == "__main__":
    urls = ["www.google.com", "www.stackoverflow.com"]
    run_pipeline(urls)
stacksonstacks
  • 8,613
  • 6
  • 28
  • 44
0

It's not really clear what you mean under the "rate limit downloads". In the case, it's a number of concurrent downloads, which is a frequent use-case, I think the simple solution is to use semaphores with a process pool:

#!/usr/bin/env python3
import os
import time
import random
from functools import partial
from multiprocessing import Pool, Manager


CPU_NUM = 4
CONCURRENT_DOWNLOADS = 2


def download(url, semaphore):
    pid = os.getpid()

    with semaphore:
        print('Process {p} is downloading from {u}'.format(p=pid, u=url))
        time.sleep(random.randint(1, 5))

    # Process the obtained resource:
    time.sleep(random.randint(1, 5))

    return 'Successfully processed {}'.format(url)


def main():
    manager = Manager()

    semaphore = manager.Semaphore(CONCURRENT_DOWNLOADS)
    target = partial(download, semaphore=semaphore)

    urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]

    with Pool(processes=CPU_NUM) as pool:
        results = pool.map(target, urls)

    print(results)


if __name__ == '__main__':
    main()

As you can see, there are only CONCURRENT_DONWLOADS processes downloading at a time while the others are busy processing obtained resources.

constt
  • 2,250
  • 1
  • 17
  • 18