0

I am new to Python and I am wondering how to implement more syntactically efficient the following problem. I have functions f1, f2 ... fN Those functions are wrappers which spawn new processes (with targets _f1, _f2, .. _fN), pass its argument (arg1, arg2, ...) to the child processes and receive the return values
With code like this I want the module functionality to execute in a different process then the caller(user of the module) process.
Functions f1, f2, ... fN (respectively _f1, f2, ... _fN) may have different prototypes.

in a module

def _f1(arg1, arg2, ... argn,  connection):
    ...
    connection.send(return_value)
    connection.close()
def f1(arg1, arg2, ... argn):
    parent_conn, child_conn = Pipe()
    p = Process(target=_f1, args=(arg1, arg2, ... argn, child_conn))
    p.start()
    p.join() 
    return parent_conn.recv()


def _f2(arg1, arg2, ... argm,  connection):
    ...
    connection.send(return_value)
    connection.close()    
def f2(arg1, arg2, ... argn):
    parent_conn, child_conn = Pipe()
    p = Process(target=_f2, args=(arg1, arg2, ... argm, child_conn))
    p.start()
    p.join() 
    return parent_conn.recv()

...

def _fn(arg1, arg2, ... argk,  connection):
    ...
    connection.send(return_value)
    connection.close()    
def fN(arg1, arg2, ... argn):
    parent_conn, child_conn = Pipe()
    p = Process(target=_fN, args=(arg1, arg2, ... argk, child_conn))
    p.start()
    p.join() 
    return parent_conn.recv()

It is clear that wrapper functions f1,f2, fN are about the same. Can I implement them as a single wrapper function probably? I want the execution to be not blocking. The user of the module should be able to execute concurrently f1 and f2 for example.

I hope I have managed to explain my question.

Here concrete example with two functions sum() and sin():

def _sum(a, b,  connection):
   return_value=a+b
   connection.send(return_value)
   connection.close()
def sum(a, b):
   parent_conn, child_conn = Pipe()
   p = Process(target=_sum, args=(a, b, child_conn))
   p.start()
   p.join() 
   return parent_conn.recv()

def _sin(x,  connection):
   return_value=sin(x)
   connection.send(return_value)
   connection.close()    
def sin(x):
   parent_conn, child_conn = Pipe()
   p = Process(target=_sin, args=(x, child_conn))
   p.start()
   p.join() 
   return parent_conn.recv() 

Taking the srj idea about using decoration i came to a solution posted below. I have tried to expand it even further to decorate also connection.send(return_value) and connection.close() but it doesn't work for me. Below the code. With coments I specify what is working and what equivalen (in my opinion) is not working. Any help?

from multiprocessing import Process, Pipe

def process_wrapper1(func):
    def wrapper(*args):
        parent_conn, child_conn = Pipe()
        f_args = args + (child_conn,)
        p = Process(target=func, args=f_args)
        p.start()
        p.join() 
        return parent_conn.recv()
    return wrapper

def process_wrapper2(func):
    def wrapper(*args):
        res=func(*args[0:len(args)-1])
        args[-1].send(res)
        args[-1].close()
    return wrapper



#def _sum(a, b,  connection):            #Working 
#   return_value=a+b
#   connection.send(return_value)
#   connection.close()
def __sum(a, b):                       #Doesn't work, see the error bellow
    return(a+b)    
_sum=process_wrapper2(__sum)

sum=process_wrapper1(_sum) 

The above code in the Pyzo ipython shell generates the following result:

In [3]: import test1
In [4]: test1.sum(2,3)
---------------------------------------------------------------------------
PicklingError                             Traceback (most recent call last)
<ipython-input-4-8c542dc5e11a> in <module>()
----> 1 test1.sum(2,3)

C:\projects\PYnGUInLib\test1.py in wrapper(*args)
     11         f_args = (child_conn,) + args
     12         p = Process(target=func, args=f_args)
---> 13         p.start()
     14         p.join()
     15         return parent_conn.recv()

C:\pyzo2014a_64b\lib\multiprocessing\process.py in start(self)
    103                'daemonic processes are not allowed to have children'
    104         _cleanup()
--> 105         self._popen = self._Popen(self)
    106         self._sentinel = self._popen.sentinel
    107         _children.add(self)

C:\pyzo2014a_64b\lib\multiprocessing\context.py in _Popen(process_obj)
    210     @staticmethod
    211     def _Popen(process_obj):
--> 212         return _default_context.get_context().Process._Popen(process_obj)
    213 
    214 class DefaultContext(BaseContext):

C:\pyzo2014a_64b\lib\multiprocessing\context.py in _Popen(process_obj)
    311         def _Popen(process_obj):
    312             from .popen_spawn_win32 import Popen
--> 313             return Popen(process_obj)
    314 
    315     class SpawnContext(BaseContext):

C:\pyzo2014a_64b\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
     64             try:
     65                 reduction.dump(prep_data, to_child)
---> 66                 reduction.dump(process_obj, to_child)
     67             finally:
     68                 context.set_spawning_popen(None)

C:\pyzo2014a_64b\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
     57 def dump(obj, file, protocol=None):
     58     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 59     ForkingPickler(file, protocol).dump(obj)
     60 
     61 #

PicklingError: Can't pickle <function process_wrapper2.<locals>.wrapper at 0x0000000005541048>: attribute lookup wrapper on test1 failed
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\pyzo2014a_64b\lib\multiprocessing\spawn.py", line 106, in spawn_main
   exitcode = _main(fd)
  File "C:\pyzo2014a_64b\lib\multiprocessing\spawn.py", line 116, in _main
   self = pickle.load(from_parent)
EOFError: Ran out of input

In [5]: 
Dimitar Penev
  • 141
  • 1
  • 7

2 Answers2

1

You could use a decorator to wrap the function with the boilerplate of creating the process and executing it.

def process_wrapper(func):
    def wrapper(*args):
        parent_conn, child_conn = Pipe()
        #attach the connection to the arguments
        f_args = args + (child_conn,)
        p = Process(target=func, args=f_args)
        p.start()
        p.join() 
        return parent_conn.recv()
    return wrapper

and define the function as

@process_wrapper
def _f2(arg1, arg2, ... argm,  connection):
    ...
    connection.send(return_value)
    connection.close()

explanation : The process_wrapper function takes a function that has N positional arguments, the last of which is always a pipe connection. It returns a function with N-1 arguments, with the connection pre-filled in it.

in case of your concrete function,

@process_wrapper
def sin(x,  connection):
   return_value=sin(x)
   connection.send(return_value)
   connection.close()  

@process_wrapper
def sum(a, b,  connection):
   return_value=a+b
   connection.send(return_value)
   connection.close()

you could call the function as

sum(a,b)

More references to python decorators http://www.jeffknupp.com/blog/2013/11/29/improve-your-python-decorators-explained/

srj
  • 9,591
  • 2
  • 23
  • 27
  • Hi srj, I think I learend what decorator is but I don't understand how exactly I can use them in my particular case. In the original question I have added an concrete example with two functions. Can you post the code equivalent using decorators which optimizes the two wrappers sum() and sin()? – Dimitar Penev Oct 19 '14 at 19:29
  • I have made edits to the answer, please take a look at it if that is what you are looking for. – srj Oct 20 '14 at 05:42
  • Hi srj, I get an error testing your code. Can't pickle : it's not the same object as test1.sum ... EOFError: Ran out of input I am not sure what the problem is. I think this is due to the fact original function is not available, in your case we have only the decorated version. Do you still think that what i want can be acheive with decorators? `def _sum(a, b, connection): return_value=a+b connection.send(return_value) connection.close()` – Dimitar Penev Oct 20 '14 at 08:19
  • instead of decorating the function, you could do sum = process_wrapper(_sum). – srj Oct 20 '14 at 09:48
  • yes this is what i tried. Please see the code I have put at the end of my original question. The first decoration is working fine but when I tried to use another decoration it failed with `PicklingError: Can't pickle .wrapper at 0x0000000005962A60>: attribute lookup wrapper on test1 failed` I think tehcode is close to be usefull. Thank you srj! – Dimitar Penev Oct 20 '14 at 10:50
  • Dimitar, when do you get the pickling error? could you provide the line which is throwing the error ? are you using pickle.load/dump ? the whole idea behind the decorator is that you need not define a decorator per function. I have edited my response to explain what the decorator does. – srj Oct 21 '14 at 05:46
  • see [this](http://stackoverflow.com/questions/4677012/python-cant-pickle-type-x-attribute-lookup-failed) S.O question and see if it resolves the pickling problem. – srj Oct 21 '14 at 06:38
  • I have posted teh whole error dump I am getting in the original question. Yes I got the idea of using a single decorator but on the sum example I just don't know how to decorate all 'multiprocessing' code in a single decorator so I plan to use two decorators instead which is not a problem to me. Any idea why I am getting this issue? – Dimitar Penev Oct 21 '14 at 07:12
  • Objects are passed back and forth to python processes using the pickle module. One of the requirements of the pickling is that the functions must be defined at the module level. Since a decorator essentially hides the actual function definition from the module level, you will run into this issue. – Joel Cornett Oct 21 '14 at 07:23
  • @Joel, up to my undestandings all is defined in my module level. In addition I don't use the decorators in its '@' form so the original fumctions definition remain. – Dimitar Penev Oct 21 '14 at 08:33
  • @DimitarPenev: Based on your code, your decorators contain an inner function named (in your case) `wrapper()`. `wrapper()` is not defined at the module level. – Joel Cornett Oct 21 '14 at 12:42
  • OK, so you Gents confirm that what I want can not be done with decorators? – Dimitar Penev Oct 21 '14 at 17:26
  • I still am not sure why the shell you are using doesn't let you define inner functions inside a function. as i cannot reproduce it in my local easily, could you see if decorating the `wrapper` function with `@functools.wraps(func)` and check if it is working ? – srj Oct 22 '14 at 10:34
0

You should use multiprocessing.Pool. Here is an example:

def f1(*args):
    rv = do_calculations()
    return rv 

def f2(*args):
    ...

...
def fN(*args):
    ...

def worker(args):
    fn = args[0]
    return fn(*args[1:])

inputs = [
    [f1, f1_args],
    [f2, f2_args],
    ...
    [fN, fN_args]
]

pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
results = pool.map(worker, inputs)
Joel Cornett
  • 24,192
  • 9
  • 66
  • 88
  • Hi Joel, pools seems to be very good tool thanks! For my purpose pool.apply_async() is what I need actually. I was not able to use it in the interactice shell however which is strong requirement for my project. Do you think this can be solved? – Dimitar Penev Oct 21 '14 at 08:18
  • Check out the [`code` module](https://docs.python.org/2.7/library/code.html?highlight=code#module-code). It provides facilities for implementing interactive interpreters within a non-interactive script. – Joel Cornett Oct 21 '14 at 12:40
  • Hi Joel, what i want is to be abl eto use my library from the ipython_qtconsole integrated in the IEP IDE. I don't want to use interactive interpreters within a script. Or did I misunderstood? – Dimitar Penev Oct 21 '14 at 19:10
  • Well, it looks like ipython has it's own multiprocessing tools bundled with it. Check out http://nbviewer.ipython.org/github/vals/scilife-python-course/blob/master/parallel%20python.ipynb – Joel Cornett Oct 21 '14 at 21:47