-1

How do I call a method from a different class (different module) with the use of Multiprocess pool in python?

My aim is to start a process which keep running until some task is provide, and once task is completed it will again go back to waiting mode.

Below is code, which has three module, Reader class is my run time task, I will provide execution of reader method to ProcessExecutor. Process executor is process pool, it will continue while loop until some task is provided to it.

Main module which initiates everything.

Module 1

class Reader(object):
    def __init__(self, message):
        self.message = message

    def reader(self):
        print self.message

Module 2


class ProcessExecutor():
    def run(self, queue):
        print 'Before while loop'
        while True:
            print 'Reached Run'
            try:
                pair = queue.get()
                print 'Running process'
                print pair
                func = pair.get('target')
                arguments = pair.get('args', None)
                if arguments is None:
                    func()
                else:
                    func(arguments)
                queue.task_done()
            except Exception:
                print Exception.message



main Module
from process_helper import ProcessExecutor
from reader import Reader
import multiprocessing
import Queue

if __name__=='__main__':
    queue = Queue.Queue()
    myReader = Reader('Hi')
    ps = ProcessExecutor()
    pool = multiprocessing.Pool(2)
    pool.apply_async(ps.run, args=(queue, ))
    param = {'target': myReader.reader}
    queue.put(param)

Code executed without any error: C:\Python27\python.exe C:/Users/PycharmProjects/untitled1/main/main.py

Process finished with exit code 0

Code gets executed but it never reached to run method. I am not sure is it possible to call a method of the different class using multi-processes or not

I tried apply_async, map, apply but none of them are working. All example searched online are calling target method from the script where the main method is implemented. I am using python 2.7 Please help.

Soni
  • 3
  • 3
  • 1
    It is possible to call a method from another class. But the code you've posted here has so many problems that are probably irrelevant to your real code (indentation errors, syntax errors, and name errors all over the place) that it's impossible to debug what might be wrong in your real code. Please read [mcve] in the help to see how to create a question that can be answered. – abarnert Apr 07 '18 at 02:43
  • This is my first question here, would please help point out how can I use the method from a different class? – Soni Apr 07 '18 at 02:47
  • Your code still has missing imports, indentation errors, and… probably more errors beyond that but I haven't bothered to try to fix it again. It's also not complete, and not minimal. Nobody can debug this. – abarnert Apr 07 '18 at 02:59
  • I have added runnable code, can you please help. My aim to start process loop, which will continue to seek for task, once task is provided, it will work on that task and after finish that task it will again back in waiting mode. – Soni Apr 07 '18 at 03:48
  • OK, I'm not sure if this counts as a dup of [this question](https://stackoverflow.com/questions/1816958/cant-pickle-type-instancemethod-when-using-multiprocessing-pool-map) or not, because you have two other problems you need to fix before you even get to that problem. I'll write a short answer, but see that question for more details on the big one. – abarnert Apr 07 '18 at 04:09
  • Thank you so much, I really appreciate – Soni Apr 07 '18 at 04:10

1 Answers1

0

Your first problem is that you just exit without waiting on anything. You have a Pool, a Queue, and an AsyncResult, but you just ignore all of them and exit as soon as you've created them. You should be able to get away with only waiting on the AsyncResult (after that, there's no more work to do, so who cares what you abandon), except for the fact that you're trying to use Queue.task_done, which doesn't make any sense without a Queue.join on the other side, so you need to wait on that as well.

Your second problem is that you're using the Queue from the Queue module, instead of the one from the multiprocessing module. The Queue module only works across threads in the same process.

Also, you can't call task_done on a plain Queue; that's only a method for the JoinableQueue subclass.

Once you've gotten to the point where the pool tries to actually run a task, you will get the problem that bound methods can't be pickled unless you write a pickler for them. Doing that is a pain, even though it's the right way. The traditional workaround—hacky and cheesy, but everyone did it, and it works—is to wrap each method you want to call in a top-level function. The modern solution is to use the third-party dill or cloudpickle libraries, which know how to pickle bound methods, and how to hook into multiprocessing. You should definitely look into them. But, to keep things simple, I'll show you the workaround.

Notice that, because you've created an extra queue to pass methods onto, in addition to the one built into the pool, you'll need the workaround for both targets.

With these problems fixed, your code looks like this:

from process_helper import ProcessExecutor
from reader import Reader
import multiprocessing

def call_run(ps):
    ps.run(queue)

def call_reader(reader):
    return reader.reader()

if __name__=='__main__':
    queue = multiprocessing.JoinableQueue()
    myReader = Reader('Hi')
    ps = ProcessExecutor()
    pool = multiprocessing.Pool(2)
    res = pool.apply_async(call_run, args=(ps,))
    param = {'target': call_reader, 'args': myReader}
    queue.put(param)
    print res.get()
    queue.join()

You have additional bugs beyond this in your ProcessReader, but I'm not going to debug everything for you. This gets you past the initial hurdles, and shows the answer to the specific question you were asking about. Also, I'm not sure what the point of all that code is. You seem to be trying to replace what Pool already does on top of Pool, only in a more complicated but less powerful way, but I'm not entirely sure.

Meanwhile, here's a program that does what I think you want, with no problems, by just throwing away that ProcessExecutor and everything that goes with it:

from reader import Reader
import multiprocessing

def call_reader(reader):
    return reader.reader()

if __name__=='__main__':
    myReader = Reader('Hi')
    pool = multiprocessing.Pool(2)
    res = pool.apply_async(call_reader, args=(myReader,))
    print res.get()
abarnert
  • 354,177
  • 51
  • 601
  • 671
  • 1
    Thank you for your quick response, but still my code is not doing anything, it enters to infinite loop. I added print statement in call_run method and it is not even going there. – Soni Apr 07 '18 at 05:17
  • @Soni As I said, there are also a bunch of bugs in your `ProcessExecutor` class, and I'm not even sure what exactly it's supposed to be doing, so I can't fix them. As I showed at the end, the easiest way to get what I think you want is to just _scrap_ that class. – abarnert Apr 07 '18 at 05:18
  • You are correct, my approach is derived from multi threading, and I was not accepting actual implementation of Pool. I was trying to implement Pool in threading manner. – Soni Apr 07 '18 at 05:57