22

I have a function that is side-effect free. I would like to run it for every element in an array and return an array with all of the results.

Does Python have something to generate all of the values?

APerson
  • 8,140
  • 8
  • 35
  • 49
Sandro
  • 2,219
  • 4
  • 27
  • 41
  • 1
    do you mean map as in map-reduce? can you give an example of an input and an output row? – Simon Apr 01 '10 at 18:44
  • No, I do not mean map reduce. Since I want all of the data from each individual function returned. It's just that each value can be calculated independently of one another. Although, since I think that I want the max of the set, maybe I can use map reduce here... – Sandro Apr 01 '10 at 20:28
  • map() would do what you say, operate on each element independently (with the GIL caveats mentioned below) – Adam Nelson Apr 01 '10 at 22:17
  • streams.fastmap() from Pyxtension: https://github.com/asuiu/pyxtension does exactly this - multithreaded map. – asu May 24 '20 at 13:13
  • pyxtension library can now be found on PyPi also : https://pypi.org/project/pyxtension/ – asu Nov 15 '22 at 22:59

6 Answers6

18

Try the Pool.map function from multiprocessing:

http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers

It's not multithreaded per-se, but that's actually good since multithreading is severely crippled in Python by the GIL.

samtregar
  • 6,888
  • 3
  • 21
  • 17
  • Very cool, that last line sold me. I saw this multiprocessing library before but I thought that it was too heavy weight for my needs. I think that I see the light now :) Thank you. – Sandro Apr 01 '10 at 23:58
  • Is this answer up to date/still relevant in regards to multithreading/GIL? – Moberg Mar 02 '17 at 13:35
  • 1
    Yes, for some values of "severe." I think it's a matter of opinion how bad it is, but I still prefer multiple processes on Linux where processes aren't too heavy. – samtregar Mar 18 '17 at 19:34
9

Try concurrent.futures.ThreadPoolExecutor.map in Python Standard Library (New in version 3.2).

Similar to map(func, *iterables) except:

  • the iterables are collected immediately rather than lazily;
  • func is executed asynchronously and several calls to func may be made concurrently.

A simple example (modified from ThreadPoolExecutor Example):

import concurrent.futures
import urllib.request

URLS = [
  'http://www.foxnews.com/',
  'http://www.cnn.com/',
  'http://europe.wsj.com/',
  'http://www.bbc.co.uk/',
]

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    # Do something here
    # For example
    with urllib.request.urlopen(url, timeout=timeout) as conn:
      try:
        data = conn.read()
      except Exception as e:
        # You may need a better error handler.
        return b''
      else:
        return data

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    # map
    l = list(executor.map(lambda url: load_url(url, 60), URLS))

print('Done.')
Community
  • 1
  • 1
Haipeng Li
  • 91
  • 1
  • 2
3

Python now has the concurrent.futures module, which is the simplest way of getting map to work with either multiple threads or multiple processes.

https://docs.python.org/3/library/concurrent.futures.html

Maximilian
  • 7,512
  • 3
  • 50
  • 63
2

You can use the multiprocessing python package (http://docs.python.org/library/multiprocessing.html). The cloud python package, available from PiCloud (http://www.picloud.com), offers a multi-processing map() function as well, which can offload your map to the cloud.

BrainCore
  • 5,214
  • 4
  • 33
  • 38
1

Below is my map_parallel function. It works just like map, except it can run each element in parallel in a separate thread (but see note below). This answer builds upon another SO answer.

import threading
import logging
def map_parallel(f, iter, max_parallel = 10):
    """Just like map(f, iter) but each is done in a separate thread."""
    # Put all of the items in the queue, keep track of order.
    from queue import Queue, Empty
    total_items = 0
    queue = Queue()
    for i, arg in enumerate(iter):
        queue.put((i, arg))
        total_items += 1
    # No point in creating more thread objects than necessary.
    if max_parallel > total_items:
        max_parallel = total_items

    # The worker thread.
    res = {}
    errors = {}
    class Worker(threading.Thread):
        def run(self):
            while not errors:
                try:
                    num, arg = queue.get(block = False)
                    try:
                        res[num] = f(arg)
                    except Exception as e:
                        errors[num] = sys.exc_info()
                except Empty:
                    break

    # Create the threads.
    threads = [Worker() for _ in range(max_parallel)]
    # Start the threads.
    [t.start() for t in threads]
    # Wait for the threads to finish.
    [t.join() for t in threads]

    if errors:
        if len(errors) > 1:
            logging.warning("map_parallel multiple errors: %d:\n%s"%(
                len(errors), errors))
        # Just raise the first one.
        item_i = min(errors.keys())
        type, value, tb = errors[item_i]
        # Print the original traceback
        logging.info("map_parallel exception on item %s/%s:\n%s"%(
            item_i, total_items, "\n".join(traceback.format_tb(tb))))
        raise value
    return [res[i] for i in range(len(res))]

NOTE: One thing to be careful of is Exceptions. Like normal map, the above function raises an exception if one of it's sub-thread raises an exception, and will stop iteration. However, due to the parallel nature, there's no guarantee that the earliest element will raise the first exception.

speedplane
  • 15,673
  • 16
  • 86
  • 138
0

Maybe try the Unladen Swallow Python 3 implementation? That might be a major project, and not guaranteed to be stable, but if you're inclined it could work. Then list or set comprehensions seem like the proper functional structure to use.

Adam Nelson
  • 7,932
  • 11
  • 44
  • 64