1

I'm trying to inherit a sub class from multiprocessing.Process, which will have a queue for each instant, so that the queue can be use to catch the return value of the target. But the problem is the multiprocessing.Process.start() uses subprocess.Popen (https://github.com/python/cpython/blob/master/Lib/multiprocessing/process.py) to create a process and run the target inside it. Is there a way to overload this without defining/overloading the entire Process module.

This is what I'm trying to do:

class Mprocessor(multiprocessing.Process):
    def __init__(self, **kwargs):
        multiprocessing.Process.__init__(self, **kwargs)
        self._ret = Queue.Queue()

    def run(self):
        self._ret.put(multiprocessing.Process.run(self))

    def getReturn(self):
        if self._ret.empty:
            return None
        return self._ret.get()

Here I try to create a multiprocessig.Queue inside the class. I override the 'run' method so when it is executed the return value/s of the target is put inside the queue. I have a 'getReturn' method which is called in the main function using the Mprocess class. This method should only be called when 'Mprocess.isalive()' method(which is defined for multiprocessing.Process) returns false. But this mechanism is not working because when I call 'Mprocess.start()' it creates a subprocess which runs the target in its own environment. I want to know if there's a way to use the queue in the start method to get the return value, and avoid the target to have a queue argument to communicate. I wanted to generalize this module. I don't want my methods to be defined to have a queue to get return value. I want to have a module so that it can be applicable to any function, because I am planning to have a manager method, which takes a dict["process_name/ID" : methods/targets], a dict["process name/ID" : [argument_list]] and create a process for each of this targets and return a dict["process_name/ID" : (return tuple, ). Any ideas will be welcomed.

EDIT Manager function:

def Processor_call(func = None, func_args = None):
    if sorted(func.keys()) != sorted(func_args()):
        print "Names in func dict and args dict doesn't match"
        return None

    process_list = multiprocessing.Queue()
    for i in func.keys():
        p = Mprocessor(name = i, target = func[i], args = tuple(func_args[i]))  
        process_list.put(p)
        p.start()

    return_dict = {}
    while not process_list.empty():
        process_wait = process_list.get()
        if not process_wait.is_alive():
            process_wait.join()
            if process_wait.exitcode == 0:
                return_dict[process_wait.name] = process_wait.getReturn()
            else:
               print "Error in process %s, status not availabe" %process_wait.name
        else:
            join_process.put(process_wait)
    return return_dict

EDIT: The target function should look like this.

def sum(a , b):
    return a + b

I don't want to pass a queue into the function, and return with queue. I want to make a common module so that, any existing methods can use multiprocessing without any change to its definition, So the interface with other modules are maintained. I don't want a function to be designed only to be run as a process, I want to have the common interface so that other modules can also use this function as a normal method, without bothering to read from the queue to get the return value.

Pavan
  • 108
  • 1
  • 12

1 Answers1

2

Comment: ... so that I'll get the return value from the process started from start method

This will work for me, for instance:

class Mprocessor

class Mprocessor(multiprocessing.Process):
    def __init__(self, queue, **kwargs):
        multiprocessing.Process.__init__(self, **kwargs)
        self._ret = queue

    def run(self):
        return_value = self._target( *self._args )
        self._ret.put((self.name, return_value))
        time.sleep(0.25)
        exit(0)

Start processes and wait for return values

def Processor_call(func=None, func_args=None):
    print('func=%s, func_args=%s' % (func, func_args))

    ret_q = multiprocessing.Manager().Queue()
    process_list = []
    for i in func.keys():
        p = Mprocessor(name=i, target=func[i], args=(func_args[i],), queue=ret_q)
        p.start()
        process_list.append(p)
        time.sleep(0.1)

    print('Block __main__ until all process terminated')
    for p in process_list:
        p.join()

    print('Aggregate alle return values')
    return_dict = {}
    while not ret_q.empty():
        p_name, value = ret_q.get()
        return_dict[p_name] = value

    return return_dict

__main__

if __name__ == '__main__':
    rd = Processor_call({'f1':f1, 'f2':f1}, {'f1':1, 'f2':2})
    print('rd=%s' % rd)

Output:
func={'f1': , 'f2': }, func_args={'f1': 1, 'f2': 2}
pid:4501 start 2
pid:4501 running
pid:4500 start 1
pid:4500 running
Block __main__ until all process terminated
pid:4501 running
pid:4500 running
pid:4501 running
pid:4500 running
pid:4501 Terminate
pid:4500 Terminate
Aggregate alle return values
rd={'f1': 1, 'f2': 2}

Tested with Python:3.4.2 and 2.7.9


Question: Is it possible to inherit multiprocessing.Process to communicate with the main process

Yes, it's possible. But not useing a class object, as your process use it's own copy of the class object .
You have to use a global Queue object and pass it to your process .

stovfl
  • 14,998
  • 7
  • 24
  • 51
  • Having a global and praying it to my process won't do it, if I don't put the return values into the queue, which involves changing the implementation of the target function to use that global queue. – Pavan May 02 '17 at 16:01
  • Hi, the queue I declare inside the Mprocess class is visible to the main until I have the Mprocess object, having a global queue doesn't privide the answer to my question. Because when I do Mprocess.start(), the start method opens a separate process which has it own process memory and runs the target inside it. What I want is a way to overload the start method so that I'll get the return value from the process started from start method, And I have tried the approach you suggested, and it doesn't work to solve the problem I have. – Pavan May 03 '17 at 05:14
  • And also with "return_dict[process_wait.name] = global_Queue.get()" it is possible that the code can global_Queue.get() values which are not from the process_wait.name process, because I wouldn't know the order in which the other processes finished, and the order of the return values put into the queue, this is why I wanted a separate queue associated with each process, so I can access them by the Mprocess object itself. – Pavan May 03 '17 at 05:18
  • Hi, sending queue as a parameter and putting the return value in the queue, is what I'm trying to avoid. The function/target should return normally using "return" command. And I need to capture that return value. – Pavan May 03 '17 at 11:24
  • @Pavan: Is also possible, changed my Answer to do so. – stovfl May 03 '17 at 12:53
  • Hi, answer is working, I made one change instead of args = (func_args[i],) I did args = tuple(func_args[i]). It was creating a tuple with a single element of list in the first case. Thank you very much for your time. :) – Pavan May 04 '17 at 05:13