5

This is a follow up question to: Python: How can I run python functions in parallel?

Minimal Working Example:

'''
Created on 06.05.2015
https://stackoverflow.com/questions/7207309/python-how-can-i-run-python-functions-in-parallel
'''
from multiprocessing import Process
import time

def runInParallel(*fns):
    proc = []
    for fn in fns:
        p = Process(target=fn)
        p.start()
        proc.append(p)
    for p in proc:
        p.join()

def func1():
    s=time.time()
    print 'func1: starting', s 
    for i in xrange(1000000000):
        if i==i:
            pass
    e = time.time()
    print 'func1: finishing', e
    print 'duration', e-s

if __name__ == '__main__':
    s =time.time()
    runInParallel(func1, func1, func1, func1, func1)
    print time.time()-s

Which leeds to this (and it's exactly what i want):

func1: starting 1430920678.09

func1: starting 1430920678.53

func1: starting 1430920679.02

func1: starting 1430920679.57

func1: starting 1430920680.55

func1: finishing 1430920729.68

duration 51.1449999809

func1: finishing 1430920729.78

duration 51.6889998913

func1: finishing 1430920730.69

duration 51.1239998341

func1: finishing 1430920748.64

duration 69.6180000305

func1: finishing 1430920749.25

duration 68.7009999752

71.5629999638

However, my function has quite a load of arguments, so i tested it like this:

-> func1(a) now gets an argument passed.

'''
Created on 06.05.2015
https://stackoverflow.com/questions/7207309/python-how-can-i-run-python-functions-in-parallel
'''
from multiprocessing import Process
import time

def runInParallel(*fns):
    proc = []
    for fn in fns:
        p = Process(target=fn)
        p.start()
        proc.append(p)
    for p in proc:
        p.join()

def func1(a):
    s=time.time()
    print 'func1: starting', s 
    for i in xrange(a):
        if i==i:
            pass
    e = time.time()
    print 'func1: finishing', e
    print 'duration', e-s

if __name__ == '__main__':
    s =time.time()
    g=s
    runInParallel(func1(1000000000), func1(1000000000),
                  func1(1000000000), func1(1000000000),
                  func1(1000000000))
    print time.time()-s

So now this happens:

func1: starting 1430921299.08

func1: finishing 1430921327.84

duration 28.760999918

func1: starting 1430921327.84

func1: finishing 1430921357.68

duration 29.8410000801

func1: starting 1430921357.68

func1: finishing 1430921387.14

duration 29.4619998932

func1: starting 1430921387.14

func1: finishing 1430921416.52

duration 29.3849999905

func1: starting 1430921416.52

func1: finishing 1430921447.39

duration 30.864000082

151.392999887

The process is now sequential and no longer parallel, and i don't get why! What am I missing and doing wrong?

EDIT: Additionally, how would an example look like, whre a few arguments are positional and others which are optional?

Community
  • 1
  • 1
user69453
  • 1,279
  • 1
  • 17
  • 43

3 Answers3

6

You have to pass your arguments to the Process using the argument args. For instance:

def runInParallel(*fns):
    proc = []
    for fn, arg in fns:
        p = Process(target=fn, args=(arg,))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()

And then call the function using:

runInParallel((func1, 10**9),
              (func1, 10**9),
              (func1, 10**9))

Also, you might consider using a Pool instead:

from multiprocessing import Pool

pool = Pool()
pool.apply_async(func1, (10**9,))
pool.apply_async(func1, (10**9,))
pool.apply_async(func1, (10**9,))

EDIT:

Process and Pool.apply_asynch work the same way. They take two optional arguments args and kwargs. These are the standard variables for positional arguments and keyword arguments in python:

f(1, 2, a=3, b=4)  # is equivalent to
args, kwargs = (1, 2), {"a":3, "b":4}
f(*args, **kwargs)

Same example with multiprocessing:

args, kwargs = (1, 2), {"a":3, "b":4}
Process(target=f, args=args, kwargs=kwargs).start()
# Or
pool = Pool()
args, kwargs = (1, 2), {"a":3, "b":4}
pool.apply_async(f, args, kwargs)
Vincent
  • 12,919
  • 1
  • 42
  • 64
  • You lost the `l` in the part `apply` in `pool.apply_async` – halex May 06 '15 at 14:33
  • Works fine. Would you give me a hint, it might be better to use pool? – user69453 May 06 '15 at 14:33
  • @Vincent: would you mind expanding your answer for an example with 4 arguments, where 2 are positional and 2 are optional? I would update my question according to that. so i would not have to start a new question, which is basicly of the same sort. – user69453 May 06 '15 at 15:01
2

Issue

I think your issue come from the fact you're giving a function handler in the 1st example and evaluate directly the function in the 2nd example.

i.e.

func1

is not equivalent to

func1 ()

Solution

According to s://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process you have to give your argument separatly like

p = Process(target=fn, args=(10000000,))

Hope this helped

Alvein
  • 141
  • 6
1

If you don't mind using a fork of multiprocessing, you can do something pretty cool with multiple arguments for your target of the parallel map. Here, I build a function that requires 2 arguments, but also has one optional argument as well as takes *args and **kwds. I'll build a list of inputs that have a random length, and run those in parallel.

>>> from pathos.multiprocessing import ProcessingPool as PPool
>>> pmap = PPool().map
>>> from pathos.multiprocessing import ThreadingPool as TPool
>>> tmap = TPool().map
>>> import numpy
>>>
>>> # build a function with multiple arguments, some optional
>>> def do_it(x,y,z=1,*args,**kwds):
...   import time
...   import random
...   s = time.time()
...   print 'starting', s
...   time.sleep(random.random())
...   res = sum([x,y,z]+list(args)+kwds.values())
...   e = time.time()
...   print 'finishing', e
...   print 'duration', e-s
...   return res
... 
>>> # create a bunch of random-length arrays as input for do_it
>>> input = map(numpy.random.random, tmap(numpy.random.randint, [2]*5, [6]*5))
>>> input
[array([ 0.25178071,  0.68871176,  0.92305523,  0.47103722]), array([ 0.14214278,  0.16747431,  0.59177496,  0.79984192]), array([ 0.20061353,  0.94339813,  0.67396539,  0.99919187]), array([ 0.63974882,  0.46868301,  0.59963679,  0.97704561]), array([ 0.14515633,  0.97824495,  0.57832663,  0.34167116])] 

Now, let's get our results...

>>> # call do_it in parallel, with random-length inputs
>>> result = pmap(do_it, *input)
starting 1431039902.85
starting 1431039902.85
starting 1431039902.85
starting 1431039902.85
finishing 1431039903.21
finishing 1431039903.21
duration 0.358909130096
duration 0.35973405838
finishing 1431039903.21
finishing 1431039903.21
duration 0.359538078308
duration 0.358761072159
>>> result
[1.379442164896775, 3.2465121635066176, 3.3667590048477187, 3.5887877829029042]

Of course, if you wanted to be tricky, you could run a triple-nested map all in one line.

>>> # do it, all in one line
>>> result = pmap(do_it, *map(numpy.random.random, tmap(numpy.random.randint, [2]*5, [6]*5)))
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
finishing 1431040673.73
finishing 1431040673.73
duration 0.110394001007
duration 0.111043930054
finishing 1431040673.73
duration 0.110962152481
finishing 1431040673.73
duration 0.110266923904
finishing 1431040673.74
duration 0.110939025879
>>> result
[1.9904591398425764, 1.932317817954369, 2.6365732054048432, 2.5168248011900047, 2.0410734229587968]

And, you could probably not use a blocking or serial map at all, and things would be really fast (I'm ignoring numpy random seeding here).

>>> # get a non-blocking thread map and an asynchronous processing map
>>> itmap = TPool().imap
>>> apmap = Pool().amap
>>>
>>> # do it!
>>> result = apmap(do_it, *itmap(numpy.random.random, itmap(numpy.random.randint, [2]*5, [6]*5)))
starting 1431041250.33
starting 1431041250.33
starting 1431041250.33
finishing 1431041250.44
duration 0.110985040665
finishing 1431041250.44
duration 0.110254049301
finishing 1431041250.45
duration 0.110941886902
>>> result.get()
[3.6386644432719697, 0.43038222983159957, 3.6220901279963318]

Get pathos here: https://github.com/uqfoundation

Mike McKerns
  • 33,715
  • 8
  • 119
  • 139