87

I need some way to use a function within pool.map() that accepts more than one parameter. As per my understanding, the target function of pool.map() can only have one iterable as a parameter but is there a way that I can pass other parameters in as well? In this case, I need to pass in a few configuration variables, like my Lock() and logging information to the target function.

I have tried to do some research and I think that I may be able to use partial functions to get it to work? However I don't fully understand how these work. Any help would be greatly appreciated! Here is a simple example of what I want to do:

def target(items, lock):
    for item in items:
        # Do cool stuff
        if (... some condition here ...):
            lock.acquire()
            # Write to stdout or logfile, etc.
            lock.release()

def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    pool.map(target(PASS PARAMS HERE), iterable)
    pool.close()
    pool.join()
Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
DJMcCarthy12
  • 3,819
  • 8
  • 28
  • 34
  • 2
    discussed here: http://stackoverflow.com/questions/5442910/python-multiprocessing-pool-map-for-multiple-arguments (I have used J.F. Sebastien's "star" method successfully) – Roberto Aug 28 '14 at 16:49
  • 2
    Please, whenever you use multiprocessing use a try/finally clause, with close() and join() inside finally to ensure processes are closed if an error happens. http://stackoverflow.com/questions/30506489/python-multiprocessing-leading-to-many-zombie-processes – zeehio Nov 17 '16 at 09:22
  • @zeehio Shouldn't that be automatic? – endolith Sep 07 '19 at 00:29
  • 1
    @endolith It should, but usually it is not. If the main python program ends all its children are killed/reaped, but if the main program keeps running (e.g. because the parallelization component is a minor part of the whole program), you will need something (e.g. try/finally) to ensure all processes are terminated. – zeehio Sep 24 '19 at 11:07
  • don't use close(), use the pool in a context: with multiProc.Pool(3) as my_pool: – adrien Jan 14 '20 at 09:19

3 Answers3

167

You can use functools.partial for this (as you suspected):

from functools import partial

def target(lock, iterable_item):
    for item in iterable_item:
        # Do cool stuff
        if (... some condition here ...):
            lock.acquire()
            # Write to stdout or logfile, etc.
            lock.release()

def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    l = multiprocessing.Lock()
    func = partial(target, l)
    pool.map(func, iterable)
    pool.close()
    pool.join()

Example:

def f(a, b, c):
    print("{} {} {}".format(a, b, c))

def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    a = "hi"
    b = "there"
    func = partial(f, a, b)
    pool.map(func, iterable)
    pool.close()
    pool.join()

if __name__ == "__main__":
    main()

Output:

hi there 1
hi there 2
hi there 3
hi there 4
hi there 5
Antrikshy
  • 2,918
  • 4
  • 31
  • 65
dano
  • 91,354
  • 19
  • 222
  • 219
  • 1
    Awesome, I think all I needed was a clear example like this. Thanks a ton! – DJMcCarthy12 Aug 28 '14 at 16:53
  • Great example. One question though: why is there a `for item in items:` in the definition of `target`? – Jean-Francois T. Oct 28 '15 at 09:12
  • @Jean-FrancoisT. Copy/paste mistake! Thanks for pointing it out. – dano Oct 29 '15 at 15:49
  • those who have a problem with the lock, check http://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes/25558333#25558333 – icebox19 Jul 09 '16 at 16:33
  • 4
    What if the variable is in first position? Such as `test(input, p1, p2, p3=None)` , I have `p1, p2, p3` fixed, and `input` vary ? – Mithril May 26 '17 at 07:05
  • 1
    @Mithril Write a top-level function like this: `def mytest(p1, p2, input, p3=None): test(input, p1, p2, p3=None)` Then do `func = partial(mytest, some_p1, some_p2, p3=some_p3)`, and pass `func` to `pool.map`. – dano May 26 '17 at 14:26
  • I've been struggling with how to handle this all day, thanks for such a clear solution/example – Zach Jun 26 '20 at 12:57
  • seriously, you are my hero right now – ChrisDanger Jun 30 '21 at 18:00
12

You could use a map function that allows multiple arguments, as does the fork of multiprocessing found in pathos.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> def add_and_subtract(x,y):
...   return x+y, x-y
... 
>>> res = Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1))
>>> res
[(-5, 5), (-2, 6), (1, 7), (4, 8), (7, 9), (10, 10), (13, 11), (16, 12), (19, 13), (22, 14)]
>>> Pool().map(add_and_subtract, *zip(*res))
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]

pathos enables you to easily nest hierarchical parallel maps with multiple inputs, so we can extend our example to demonstrate that.

>>> from pathos.multiprocessing import ThreadingPool as TPool
>>> 
>>> res = TPool().amap(add_and_subtract, *zip(*Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1))))
>>> res.get()
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]

Even more fun, is to build a nested function that we can pass into the Pool. This is possible because pathos uses dill, which can serialize almost anything in python.

>>> def build_fun_things(f, g):
...   def do_fun_things(x, y):
...     return f(x,y), g(x,y)
...   return do_fun_things
... 
>>> def add(x,y):
...   return x+y
... 
>>> def sub(x,y):
...   return x-y
... 
>>> neato = build_fun_things(add, sub)
>>> 
>>> res = TPool().imap(neato, *zip(*Pool().map(neato, range(0,20,2), range(-5,5,1))))
>>> list(res)
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]

If you are not able to go outside of the standard library, however, you will have to do this another way. Your best bet in that case is to use multiprocessing.starmap as seen here: Python multiprocessing pool.map for multiple arguments (noted by @Roberto in the comments on the OP's post)

Get pathos here: https://github.com/uqfoundation

Community
  • 1
  • 1
Mike McKerns
  • 33,715
  • 8
  • 119
  • 139
4

In case you don't have access to functools.partial, you could use a wrapper function for this, as well.

def target(lock):
    def wrapped_func(items):
        for item in items:
            # Do cool stuff
            if (... some condition here ...):
                lock.acquire()
                # Write to stdout or logfile, etc.
                lock.release()
    return wrapped_func

def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    lck = multiprocessing.Lock()
    pool.map(target(lck), iterable)
    pool.close()
    pool.join()

This makes target() into a function that accepts a lock (or whatever parameters you want to give), and it will return a function that only takes in an iterable as input, but can still use all your other parameters. That's what is ultimately passed in to pool.map(), which then should execute with no problems.

TheSoundDefense
  • 6,753
  • 1
  • 30
  • 42
  • 2
    I'm super late on this, but this code won't work, because nested functions can't be pickled. Calling `target(lck)` returns the nested `wrapped_func` function, which needs to be pickled to be passed to the worker process, and that will always fail. – dano Mar 20 '15 at 20:53