I want to encapsulate a multiprocessing task into a class. Both control and worker functions are members of the class. The workers are run using Pool.map_async()
, so the results can be processed while other workers are still running. The results for processing are stored in multiprocessing.Queue
. When the Queue is an instance variable, it doesn't work, whereas global variable or class variable, it works.
Example:
import multiprocessing
class A():
# Queue as instance variable
def __init__(self):
self.qout = multiprocessing.Queue()
def worker(self,x):
self.qout.put(x*x)
def process(self):
values = range(10)
with multiprocessing.Pool() as pool:
res = pool.map_async(self.worker,values)
while (not self.qout.empty() or
not res.ready()):
val = self.qout.get()
print(val)
qoutB = multiprocessing.Queue()
class B():
# Queue as global variable
def __init__(self):
pass
def worker(self,x):
qoutB.put(x*x)
def process(self):
values = range(10)
with multiprocessing.Pool() as pool:
res = pool.map_async(self.worker,values)
while (not qoutB.empty() or
not res.ready()):
val = qoutB.get()
print(val)
class C():
# Queue as Class variable
qout = multiprocessing.Queue()
def __init__(self):
pass
def worker(self,x):
self.qout.put(x*x)
def process(self):
values = range(10)
with multiprocessing.Pool() as pool:
res = pool.map_async(self.worker,values)
while (not self.qout.empty() or
not res.ready()):
val = self.qout.get()
print(val)
Now, when you call class as follows (put it below classes definitions)
a=A()
a.process()
does not work (probably stops waiting on self.qout.get()
, but
a=B()
a.process()
and
a=C()
a.process()
works (prints results). Why?
I haven't found any relevant info in Python documentation. I haven't tried to pass the queue as an argument, but it is a feature which should be hidden from the user.
The B option should be out of question, C is not ideal, as the queue would be shared between all instances of the class.
Note: This is tested on Linux (Debian, Python 3.5 from repository).