25

Most examples of the Multiprocess Worker Pools execute a single function in different processes, f.e.

def foo(args):
   pass

if __name__ == '__main__':
   pool = multiprocessing.Pool(processes=30)
   res=pool.map_async(foo,args)

Is there a way to handle two different and independent functions within the pool? So that you could assign f.e. 15 processes for foo() and 15 processes for bar() or is a pool bounded to a single function? Or du you have to create different processes for different functions manually with

 p = Process(target=foo, args=(whatever,))
 q = Process(target=bar, args=(whatever,))
 q.start()
 p.start()

and forget about the worker pool?

dorvak
  • 9,219
  • 4
  • 34
  • 43

6 Answers6

29

To pass different functions, you can simply call map_async multiple times.

Here is an example to illustrate that,

from multiprocessing import Pool
from time import sleep

def square(x):
    return x * x

def cube(y):
    return y * y * y

pool = Pool(processes=20)

result_squares = pool.map_async(f, range(10))
result_cubes = pool.map_async(g, range(10))

The result will be:

>>> print result_squares.get(timeout=1)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

>>> print result_cubes.get(timeout=1)
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
Ocaj Nires
  • 3,295
  • 1
  • 18
  • 10
  • 2
    And will they be executed parallel or "in a row"? – dorvak Aug 08 '11 at 07:41
  • 4
    The `map_async` returns immediately. As long as there are enough free processes in the pool, new tasks will be run without having to wait. In the example above, they will run parallel. @mad_scientist – Ocaj Nires Aug 08 '11 at 08:28
  • 1
    Thx!But there is no way to assign a specific amount of workers/processes, i guess? – dorvak Aug 08 '11 at 09:51
  • 2
    The [multiprocessing Pool API](http://docs.python.org/library/multiprocessing.html#module-multiprocessing.pool) does not provide a mechanism to assign specific amount of workers within the same pool. If you really want specific amount of workers per task, create different pools. Though having only a single pool is recommended. I guess it makes sense that the Pool should manage that for you transparently without you worrying about it. – Ocaj Nires Aug 08 '11 at 09:59
  • Thanks for your answer, are you positive adding `map_async()` one after the other will run in parallel. I have actually tried this and as the answer by @Sam indicates, these seem to be running sequentially. – Zhubarb Jan 22 '20 at 11:55
11

You can use map or some lambda function (edit: actually you can't use a lambda function). You can use a simple map function:

def smap(f, *args):
    return f(*args)

pool = multiprocessing.Pool(processes=30)
res=pool.map(smap, function_list, args_list1, args_list2,...)

The normal map function takes iterables as inputs, which is inconvenient.

Rayamon
  • 304
  • 3
  • 10
  • 2
    This should be accepted as the right answer, because the accepted answer runs in a quasi-parallel mode (with an awful planner). – ARA1307 Feb 29 '20 at 17:41
  • 1
    `pool.map` doesn't take that many arguments. The code would work with Python's built-in `map` but it doesn't seem to work in the multiprocessing context out of the box. – Attila the Fun Feb 14 '23 at 16:35
8

Here is a working example of the idea shared by @Rayamon:

import functools

from multiprocessing import Pool


def a(param1, param2, param3):
    return param1 + param2 + param3


def b(param1, param2):
    return param1 + param2


def smap(f):
    return f()


func1 = functools.partial(a, 1, 2, 3)
func2 = functools.partial(b, 1, 2)

pool = Pool(processes=2)
res = pool.map(smap, [func1, func2])
pool.close()
pool.join()
print(res)
ARA1307
  • 1,022
  • 10
  • 13
  • How do i pass a list of values as argument and it works individually in threads.. In case of single function it works fine but not in case of multiple functions.. – Madan Raj Jul 23 '21 at 13:02
6

They will not run in parallel. See following code:

def updater1(q,i):    
    print "UPDATER 1:", i
    return

def updater2(q,i):    
    print "UPDATER2:", i
    return

if __name__=='__main__':
    a = range(10)
    b=["abc","def","ghi","jkl","mno","pqr","vas","dqfq","grea","qfwqa","qwfsa","qdqs"]


    pool = multiprocessing.Pool()

    func1 = partial(updater1,q)
    func2 = partial(updater2,q)
    pool.map_async(func1, a)
    pool.map_async(func2, b)

    pool.close()
    pool.join()

The above code yields the following printout:

UPDATER 1: 1
UPDATER 1: 0
UPDATER 1: 2
UPDATER 1: 3
UPDATER 1: 4
UPDATER 1: 5
UPDATER 1: 6
UPDATER 1: 7
UPDATER 1: 8
UPDATER 1: 9
UPDATER2: abc
UPDATER2: def
UPDATER2: ghi
UPDATER2: jkl
UPDATER2: mno
UPDATER2: pqr
UPDATER2: vas
UPDATER2: dqfq
UPDATER2: grea
UPDATER2: qfwqa
UPDATER2: qwfsa
UPDATER2: qdqs
Sam
  • 131
  • 2
  • 5
  • This answer is either outdated for python3 or confusing things. Your code does not run because it includes a mysterious 'q'-variable. Also, this does not prove the code runs in serial - adding a sleep in the updater1-function actually does make the output interleaved, proving that there is no implicit blocking between the two lists of tasks. Looking at the cpython-source for _map_async tells me that the structure of the two lists will not even be preserved - all tasks get put into one task-queue. https://github.com/python/cpython/blob/main/Lib/multiprocessing/pool.py – julaine Jun 21 '23 at 08:35
4

Multiple Functions in one Pool

The following example shows how to run the three functions inc, dec, and add in a pool.

from multiprocessing import Pool
import functools

# -------------------------------------

def inc(x):
    return x + 1

def dec(x):
    return x - 1

def add(x, y):
    return x + y

# -------------------------------------

def smap(f):
    return f()

def main():
    f_inc = functools.partial(inc, 4)
    f_dec = functools.partial(dec, 2)
    f_add = functools.partial(add, 3, 4)
    with Pool() as pool:
        res = pool.map(smap, [f_inc, f_dec, f_add])
        print(res)

# -------------------------------------

if __name__ == '__main__':
    main()

We have three functions, which are run independently in a pool. We use the functools.partial to prepare the functions and their parameters before they are executed.

Source: https://zetcode.com/python/multiprocessing/

ATH
  • 666
  • 6
  • 13
1

To further explain the other answer above, here is an example of:

  1. Run a single function with multiple inputs in parallel using a Pool (square function) Interesting Side Note the mangled op on lines for "5 981 25"
  2. Run multiple functions with different inputs (Both args and kwargs) and collect their results using a Pool (pf1, pf2, pf3 functions)
import datetime
import multiprocessing
import time
import random

from multiprocessing import Pool

def square(x):
    # calculate the square of the value of x
    print(x, x*x)
    return x*x

def pf1(*args, **kwargs):
    sleep_time = random.randint(3, 6)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf1", args, sleep_time, datetime.datetime.now()))
    print("Keyword Args from pf1: %s" % kwargs)
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf1 done at %s\n" % datetime.datetime.now())
    return (sum(*args), kwargs)

def pf2(*args):
    sleep_time = random.randint(7, 10)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf2", args, sleep_time, datetime.datetime.now()))
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf2 done at %s\n" % datetime.datetime.now())
    return sum(*args)

def pf3(*args):
    sleep_time = random.randint(0, 3)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf3", args, sleep_time, datetime.datetime.now()))
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf3 done at %s\n" % datetime.datetime.now())
    return sum(*args)

def smap(f, *arg):
    if len(arg) == 2:
        args, kwargs = arg
        return f(list(args), **kwargs)
    elif len(arg) == 1:
        args = arg
        return f(*args)


if __name__ == '__main__':

    # Define the dataset
    dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

    # Output the dataset
    print ('Dataset: ' + str(dataset))

    # Run this with a pool of 5 agents having a chunksize of 3 until finished
    agents = 5
    chunksize = 3
    with Pool(processes=agents) as pool:
        result = pool.map(square, dataset)
    print("Result of Squares : %s\n\n" % result)
    with Pool(processes=3) as pool:
        result = pool.starmap(smap, [(pf1, [1,2,3], {'a':123, 'b':456}), (pf2, [11,22,33]), (pf3, [111,222,333])])

    # Output the result
    print ('Result: %s ' % result)


Output:
*******

Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
1 1
2 4
3 9
4 16
6 36
7 49
8 64
59 81
 25
10 100
11 121
12 144
13 169
14 196
Result of Squares : [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]


Process : ForkPoolWorker-6  Function : pf1  Args: ([1, 2, 3],)  sleeping for 3  Time : 2020-07-20 00:51:56.477299

Keyword Args from pf1: {'a': 123, 'b': 456}
Process : ForkPoolWorker-7  Function : pf2  Args: ([11, 22, 33],)   sleeping for 8  Time : 2020-07-20 00:51:56.477371

Process : ForkPoolWorker-8  Function : pf3  Args: ([111, 222, 333],)    sleeping for 1  Time : 2020-07-20 00:51:56.477918

ForkPoolWorker-8    pf3 done at 2020-07-20 00:51:57.478808

ForkPoolWorker-6    pf1 done at 2020-07-20 00:51:59.478877

ForkPoolWorker-7    pf2 done at 2020-07-20 00:52:04.478016

Result: [(6, {'a': 123, 'b': 456}), 66, 666] 

Process finished with exit code 0