1

I want to encapsulate a multiprocessing task into a class. Both control and worker functions are members of the class. The workers are run using Pool.map_async(), so the results can be processed while other workers are still running. The results for processing are stored in multiprocessing.Queue. When the Queue is an instance variable, it doesn't work, whereas global variable or class variable, it works.

Example:

import multiprocessing 
class A():
    # Queue as instance variable
    def __init__(self):
        self.qout = multiprocessing.Queue()
    def worker(self,x):
        self.qout.put(x*x)   
    def process(self):
        values = range(10)
        with multiprocessing.Pool() as pool:
            res = pool.map_async(self.worker,values)        
            while (not self.qout.empty() or
                not res.ready()):
                val = self.qout.get()
                print(val)

qoutB = multiprocessing.Queue()
class B():
    # Queue as global variable
    def __init__(self):
        pass   
    def worker(self,x):
        qoutB.put(x*x)       
    def process(self):
        values = range(10)       
        with multiprocessing.Pool() as pool:
            res = pool.map_async(self.worker,values)           
            while (not qoutB.empty() or
                not res.ready()):
                val = qoutB.get()
                print(val)

class C():
    # Queue as Class variable
    qout = multiprocessing.Queue()
    def __init__(self):
        pass
    def worker(self,x):
        self.qout.put(x*x)   
    def process(self):
        values = range(10)
        with multiprocessing.Pool() as pool:
            res = pool.map_async(self.worker,values)        
            while (not self.qout.empty() or
                not res.ready()):
                val = self.qout.get()
                print(val)  

Now, when you call class as follows (put it below classes definitions)

a=A()
a.process()

does not work (probably stops waiting on self.qout.get(), but

a=B()
a.process()

and

a=C()
a.process()

works (prints results). Why?

I haven't found any relevant info in Python documentation. I haven't tried to pass the queue as an argument, but it is a feature which should be hidden from the user.

The B option should be out of question, C is not ideal, as the queue would be shared between all instances of the class.

Note: This is tested on Linux (Debian, Python 3.5 from repository).

  • In class `A`, the queue is an instance attribute, not a class variable which is something else—so you're using the wrong terminology. You should trying making an actual class variable and see what happens. – martineau Dec 02 '18 at 09:45
  • Sorry for the wrong terminology, will fix the question. However, if the queue would be class variable, it wouldd be hsared between all instances, wouldn't it. That is not desired behaviour. – Hamiltonian Dec 02 '18 at 10:06
  • I understand. If nothing else you should try it just to see if there's the same problem (even if you can't use the technique). – martineau Dec 02 '18 at 10:16
  • Ok, so it works, if the queue is class variable... – Hamiltonian Dec 02 '18 at 10:48
  • The code in your question is incomplete, so I can't tell how you're string things up. The problem may have something to do with that no being do correctly. Also, how are things getting put in a class `A` instance's queue for it to process? You need to post a runnable example other can use to reproduce and possibly fix or workaround the problem. See [How to create a Minimal, Complete, and Verifiable Example](https://stackoverflow.com/help/mcve). – martineau Dec 02 '18 at 11:01
  • What is incomplete on that? It is copy and past from my testing (Python 3.5). You just need to add one of the 3 things below to test the classes. – Hamiltonian Dec 02 '18 at 12:34
  • To store in values to the ques is done by Queue.put() function('self.qout.put(x*x)') in this case – Hamiltonian Dec 02 '18 at 12:38
  • 1
    Is this on Windows? Also—as always—what does “does not work” mean? – Davis Herring Dec 02 '18 at 17:45
  • I can't get any of the classes to work (on Windows), so conclude you've left something out. Where's the required `if __name__ == '__main__':` that's needed in the main process? – martineau Dec 02 '18 at 17:49
  • Ok, I hoped, that python is in this portable enough. Apparenly not. I have tested this on Linux (Debian, Python 3.5 from repository). At least on linux `if __name__ == '__main__':` is not needed. It is just script running from top to bottom. – Hamiltonian Dec 02 '18 at 19:24
  • @DavisHerring Work: print out numbebers. Doesn't work: prints nothing, based on KeyInterupt callback it stops on `self.qout.get()` – Hamiltonian Dec 02 '18 at 19:30

2 Answers2

1

Again, this isn't an answer to your question. However I'm posting it because it makes the whole issue moot — because you don't really need to explicitly create and use a multiprocessing.Queue to do something like this.

Instead consider using concurrent.futures.ProcessPoolExecutor to accomplish the task.

For example:

import concurrent.futures

class A_Prime():
    def __init__(self):
        pass

    def worker(self, x):
        return x*x

    def process(self):
        with concurrent.futures.ProcessPoolExecutor() as executor:
            classname = type(self).__name__
            print(classname, '- calling executor.map')
            res = [value for value in executor.map(self.worker, range(10))]
            print(classname, '- executor.map finished')
            print('  result:', res)


if __name__ == '__main__':
    test = A_Prime()
    test.process()
    print('done')

Output:

A_Prime - calling executor.map
A_Prime - executor.map finished
  result: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
done
martineau
  • 119,623
  • 25
  • 170
  • 301
  • Does this work asynchronous? The reason I have tried to fidle with the queue is, that my actual workers takes some time and be many of them, so I want to do some stuff in the mean time. I must test more `multiprocessing.pool.AsyncResult` – Hamiltonian Dec 02 '18 at 20:39
  • Hamiltonian: Yes, I believe so. The first line of the linked documentation says "The `ProcessPoolExecutor` class is an `Executor` subclass that uses a pool of processes to execute calls **asynchronously**." (emphasis of last word mine) – martineau Dec 02 '18 at 22:28
  • Ok, the Futures are interesting indeed. It is a pitty you won't find it on google search. – Hamiltonian Dec 03 '18 at 09:34
  • Ok, I will up vote this one, but accept my answer, as it explains reasons I believe. Thanks for the tip anyway. – Hamiltonian Dec 03 '18 at 09:36
0

SO algorithm gave me interesting hints, I couldn't find earlier.

Based on this answer, queues cannot be passed as an argument to functions, which are opening new processes, because queues cannot be pickled. And this what in general self.function() does: it is equivalent to function(self). In the case of class A, the queue is attempted to be passed to workers; where as in B and C it is not and lives more or less independent on the process

The same reasoning concludes from this question and answers. Needless to say, that the manager.Queue does not work here either.

Failed testing of MCVE

This is probably due to different default start methods of multiprocessing (see docs)