31

I was sure there was something like this in the standard library, but it seems I was wrong.

I have a bunch of urls that I want to urlopen in parallel. I want something like the builtin map function, except the work is done in parallel by a bunch of threads.

Is there a good module that does this?

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
Ram Rachum
  • 84,019
  • 84
  • 236
  • 374
  • 1
    Do you mean you want map to start the threads (like pillmuncher's answer, map(urlopen, urls)), or will you manually start the urlopening threads, and want something like map to act on the results of each thread's execution, as they become available? – rbp Jul 25 '10 at 18:36
  • streams.fastmap() from Pyxtension: github.com/asuiu/pyxtension does exactly this - multithreaded map. – asu May 24 '20 at 13:15

5 Answers5

58

There is a map method in multiprocessing.Pool. That does multiple processes.

And if multiple processes aren't your dish, you can use multiprocessing.dummy which uses threads.

import urllib
import multiprocessing.dummy

p = multiprocessing.dummy.Pool(5)
def f(post):
    return urllib.urlopen('http://stackoverflow.com/questions/%u' % post)

print p.map(f, range(3329361, 3329361 + 5))
Scott Robinson
  • 983
  • 2
  • 6
  • 10
  • This is great, but it won't work on python2.6 if run from a thread because of this bug: http://bugs.python.org/issue14881 – gregsabo Nov 13 '12 at 16:52
  • Works great in python 2.79 - currently the latest version of 2x, and quite well at it, too! – FredTheWebGuy Jan 16 '15 at 00:22
  • 3
    Awesome. Worth noting that underneath it's actually `from multiprocessing.pool import ThreadPool` – Jay Oct 23 '18 at 20:00
20

Someone recommended I use the futures package for this. I tried it and it seems to be working.

http://pypi.python.org/pypi/futures

Here's an example:

"Download many URLs in parallel."

import functools
import urllib.request
import futures

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

with futures.ThreadPoolExecutor(50) as executor:
   future_list = executor.run_to_futures(
           [functools.partial(load_url, url, 30) for url in URLS])
Ram Rachum
  • 84,019
  • 84
  • 236
  • 374
5

Here is my implementation of threaded map:

from threading import Thread
from queue import Queue

def thread_map(f, iterable, pool=None):
    """
    Just like [f(x) for x in iterable] but each f(x) in a separate thread.
    :param f: f
    :param iterable: iterable
    :param pool: thread pool, infinite by default
    :return: list if results
    """
    res = {}
    if pool is None:
        def target(arg, num):
            try:
                res[num] = f(arg)
            except:
                res[num] = sys.exc_info()

        threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)]
    else:
        class WorkerThread(Thread):
            def run(self):
                while True:
                    try:
                        num, arg = queue.get(block=False)
                        try:
                            res[num] = f(arg)
                        except:
                            res[num] = sys.exc_info()
                    except Empty:
                        break

        queue = Queue()
        for i, arg in enumerate(iterable):
            queue.put((i, arg))

        threads = [WorkerThread() for _ in range(pool)]

    [t.start() for t in threads]
    [t.join() for t in threads]
    return [res[i] for i in range(len(res))]
Rugnar
  • 2,894
  • 3
  • 25
  • 29
3

The Python module Queue might help you. Use one thread that uses Queue.put() to push all urls into the queue and the worker threads simply get() the urls one by one.

Python Docs: queue — A synchronized queue class

cypheon
  • 1,023
  • 1
  • 13
  • 22
-1

I'd wrap it up in a function (untested):

import itertools
import threading
import urllib2
import Queue

def openurl(url, queue):
    def starter():
        try:
            result = urllib2.urlopen(url)
        except Ecxeption, exc:
            def raiser():
                raise exc
            queue.put((url, raiser))
        else:
            queue.put((url, lambda:result))
    threadind.Thread(target=starter).start()

myurls = ... # the list of urls
myqueue = Queue.Queue()

map(openurl, myurls, itertools.repeat(myqueue))

for each in myurls:
    url, getresult = queue.get()
    try:
        result = getresult()
    except Exception, exc:
        print 'exception raised:' + str(exc)
    else:
        # do stuff with result
pillmuncher
  • 10,094
  • 2
  • 35
  • 33