5

I have to make numerous (thousands) of HTTP GET requests to a great deal of websites. This is pretty slow, for reasons that some websites may not respond (or take long to do so), while others time out. As I need as many responses as I can get, setting a small timeout (3-5 seconds) is not in my favour.

I have yet to do any kind of multiprocessing or multi-threading in Python, and I've been reading the documentation for a good while. Here's what I have so far:

import requests
from bs4 import BeautifulSoup
from multiprocessing import Process, Pool

errors = 0

def get_site_content(site):
    try :
        # start = time.time()
        response = requests.get(site, allow_redirects = True, timeout=5)
        response.raise_for_status()
        content = response.text
    except Exception as e:
        global errors
        errors += 1
        return ''
    soup = BeautifulSoup(content)
    for script in soup(["script", "style"]):
        script.extract()
    text = soup.get_text()

    return text

sites = ["http://www.example.net", ...]

pool = Pool(processes=5)
results = pool.map(get_site_content, sites)
print results

Now, I want the results that are returned to be joined somehow. This allows two variation:

  1. Each process has a local list/queue that contains the content it has accumulated is joined with the other queues to form a single result, containing all the content for all sites.

  2. Each process writes to a single global queue as it goes along. This would entail some locking mechanism for concurrency checks.

Would multiprocessing or multithreading be the better choice here? How would I accomplish the above with either of the approaches in Python?


Edit:

I did attempt something like the following:

# global
queue = []
with Pool(processes = 5) as pool:
    queue.append(pool.map(get_site_contents, sites))

print queue

However, this gives me the following error:

with Pool(processes = 4) as pool:
AttributeError: __exit__

Which I don't quite understand. I'm having a little trouble understanding what exactly pool.map does, past applying the function on every object in the iterable second parameter. Does it return anything? If not, do I append to the global queue from within the function?

filpa
  • 3,651
  • 8
  • 52
  • 91
  • Did you read the [introduction on `multiprocessing`](https://docs.python.org/3/library/multiprocessing.html#introduction)? It describes how this can be done. –  Dec 18 '14 at 13:03
  • 1
    @LutzHorn Yes, specifically [https://docs.python.org/3/library/multiprocessing.html#using-a-pool-of-workers](this section). I have updated my question with some more specific information pertaining to it, as I found the documentation slightly confusing, unfortunately. – filpa Dec 18 '14 at 13:16
  • The exception occurs because you use a python version that does not support context managers for Pool. So just don't use the with statement with pools or switch to the newest version. Other than that, Pool.map returns a list with all results, so with your code you create a list queue containing the actual list of results. Otherwise your code seems fine. – phobic Dec 18 '14 at 14:06
  • Ah, okay. In that case though (without using `with`) how do I know when all processes have finished and the objects are safe to access (i.e. no writes are being done)? Basically, is there anything similar to threads' `join()` in this case? Or would it be better to simply stick to the pre-implemented `Queue` class like someone else suggested? – filpa Dec 18 '14 at 14:14
  • On another note, check out concurrent.futures.threadpoolexecutor. It basically does the same thing with threads, which means you won't create a new process for each function call. – phobic Dec 18 '14 at 14:14
  • I missed that; Just do a pool.close() followed by pool.join() before accessing results. – phobic Dec 18 '14 at 14:18

2 Answers2

6

pool.map starts 'n' number of processes that take a function and runs it with an item from the iterable. When such a process finishes and returns, the returned value is stored in a result list in the same position as the input item in the input variable.

eg: if a function is written to calculate square of a number and then a pool.map is used to run this function on a list of numbers. def square_this(x): square = x**2 return square

input_iterable = [2, 3, 4]
pool = Pool(processes=2) # Initalize a pool of 2 processes
result = pool.map(square_this, input_iterable) # Use the pool to run the function on the items in the iterable
pool.close() # this means that no more tasks will be added to the pool
pool.join() # this blocks the program till function is run on all the items
# print the result
print result

...>>[4, 9, 16]

The Pool.map technique may not be ideal in your case since it will block till all the processes finishes. i.e. If a website does not respond or takes too long to respond your program will be stuck waiting for it. Instead try sub-classing the multiprocessing.Process in your own class which polls these websites and use Queues to access the results. When you have a satisfactory number of responses you can stop all the processes without having to wait for the remaining requests to finish.

gnub
  • 193
  • 2
  • 11
5

I had a similar assignment in university (to implement a multiprocess web crawler) and used a multiprocessing-safe Queue class from python multiprocessing library, which will do all the magic with locks and concurrency checks. The example from docs states:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

However, I had to write a separate process class for this to work, as I wanted it to work. And I hadn't used a Pool of processes. Instead I tried to check memory usage and spawn a process until a preset memory threshold was reached.

ballade4op52
  • 2,142
  • 5
  • 27
  • 42