You did not post anything close to a minimal, reproducible example since you were missing important declarations, such as the_real_source
. So I can't fix and test your code but can give you slightly altered code that you can adapt. Here are several areas where you went wrong:
- Your are importing from
multiprocessing
the name Worker
. But Worker
does not exist in that package; it is a class that you are supplying in your own code. You do, however, need to import from that package the Process
class.
- The statement
for data in iter(self.queue.get, None):
says to iterate the input queue by making calls to self.queue.get()
until the value None
is returned. Therefore, None
is the sentinel value that is being used to signal that there is no more data left to be processed.
- Since you have 4 processes reading from the same input queue and you want all 4 processes to terminate when there is no more data, you need to place 4 instances of the sentinel value
None
on the queue.
- And since all 4 processes are reading from the same input queue and you have no control over the scheduling of the running of the processes, you can't be sure of which process will be reading which data from the queue. The only exception is that you can be sure that each process will read one sentinel value each because when a process reads a sentinel value it immediately breaks out of the loop where it is getting data from the queue and then terminates.
In the following code the processes return back to the main process their final task_type
attribute value in an output queue that is passed as an additional initializer to the Worker
instance:
from multiprocessing import Process, Queue
class Worker(Process):
def __init__(self, request_queue, output_queue):
super(Worker, self).__init__()
self.request_queue = request_queue
self.output_queue = output_queue
self.task_type = ''
def run(self):
print(f'Worker {self.pid} started')
# do some initialization here
print('Computing things!')
for data in iter(self.request_queue.get, None):
self.task_type = data['type']
# Print our ident:
print('Worker process printing:', 'self id =', self.pid, 'self.task_type =', self.task_type) # prints test
# Use data:
self.output_queue.put((self.pid, self.task_type))
if __name__ == '__main__':
request_queue = Queue()
output_queue = Queue()
workers = []
for i in range(4):
workers.append(Worker(request_queue, output_queue))
for worker in workers:
worker.start()
the_real_source = [{'type': i} for i in range(20)]
for data in the_real_source:
request_queue.put(data)
for _ in range(4):
# put sentinel for clean shutdown, which is None:
request_queue.put(None)
# Must read the output queue before joining tasks:
# We are looking for 4 results:
results = [output_queue.get() for _ in range(4)]
for pid, task_type in results:
print('Main process printing:', 'worker id =', pid, 'worker task_type =', task_type)
# wait for the tasks to complete:
for worker in workers:
worker.join()
Prints:
Worker 16672 started
Computing things!
Worker process printing: self id = 16672 self.task_type = 0
Worker process printing: self id = 16672 self.task_type = 1
Worker process printing: self id = 16672 self.task_type = 2
Worker process printing: self id = 16672 self.task_type = 3
Worker process printing: self id = 16672 self.task_type = 4
Worker process printing: self id = 16672 self.task_type = 5
Worker process printing: self id = 16672 self.task_type = 6
Worker process printing: self id = 16672 self.task_type = 7
Worker process printing: self id = 16672 self.task_type = 8
Worker process printing: self id = 16672 self.task_type = 9
Worker process printing: self id = 16672 self.task_type = 10
Worker process printing: self id = 16672 self.task_type = 11
Worker process printing: self id = 16672 self.task_type = 12
Worker 19620 started
Worker process printing: self id = 16672 self.task_type = 13
Computing things!
Worker process printing: self id = 16672 self.task_type = 14
Worker process printing: self id = 19620 self.task_type = 15
Worker 6728 started
Worker process printing: self id = 16672 self.task_type = 16
Worker process printing: self id = 19620 self.task_type = 17
Computing things!
Worker 17724 started
Worker process printing: self id = 16672 self.task_type = 18
Worker process printing: self id = 19620 self.task_type = 19
Computing things!
Main process printing: worker id = 6728 worker task_type =
Main process printing: worker id = 16672 worker task_type = 18
Main process printing: worker id = 19620 worker task_type = 19
Main process printing: worker id = 17724 worker task_type =
In the above run all of the input messages were grabbed by 2 of the 4 processes.
Multithreading Version
You need to use multithreading where your worker threads share the same address space as your main thread if you want to have the main thread be able to access the task_type
attribute of a Worker
instance at the same time the worker thread is still running and get the current value. Otherwise, if you are using multiprocessing, the best you can do is have the final value returned back to the main process in a passed output queue (@Aaron used the output queue to pass back the Worker
instance in its entirety, but this may be more information than you needed as the multiprocessing version above shows). In the code below, however, the main thread, is only accessing the task_type
attributes as the threads complete, but it could have been done at any time:
from threading import Thread
from queue import Queue
class Worker(Thread):
def __init__(self, queue):
super(Worker, self).__init__()
self.queue = queue
self.task_type = ''
def run(self):
print(f'Worker {self.ident} started')
# do some initialization here
print('Computing things!')
for data in iter(self.queue.get, None):
self.task_type = data['type']
# Print our ident:
print('Worker thread printing:', 'self id =', self.ident, 'self.task_type =', self.task_type) # prints test
# Use data
if __name__ == '__main__':
request_queue = Queue()
workers = []
for i in range(4):
workers.append(Worker(request_queue))
for worker in workers:
worker.start()
the_real_source = [{'type': i} for i in range(20)]
for data in the_real_source:
request_queue.put(data)
for _ in range(4):
# put sentinel for clean shutdown, which is None:
request_queue.put(None)
# wait for the tasks to complete:
for worker in workers:
worker.join()
print('Main thread printing:', 'worker id =', worker.ident, 'worker task_type =', worker.task_type)
Prints:
Worker 3980 started
Computing things!
Worker 12084 started
Computing things!
Worker 3552 started
Computing things!
Worker 5296 started
Computing things!
Worker thread printing: self id = 3980 self.task_type = 0
Worker thread printing: self id = 12084 self.task_type = 2
Worker thread printing: self id = 3552 self.task_type = 1
Worker thread printing: self id = 3552 self.task_type = 6
Worker thread printing: self id = 3552 self.task_type = 7
Worker thread printing: self id = 3980 self.task_type = 4
Worker thread printing: self id = 12084 self.task_type = 5
Worker thread printing: self id = 3980 self.task_type = 8
Worker thread printing: self id = 5296 self.task_type = 3
Worker thread printing: self id = 3552 self.task_type = 9
Worker thread printing: self id = 12084 self.task_type = 10
Worker thread printing: self id = 3980 self.task_type = 11
Worker thread printing: self id = 5296 self.task_type = 12
Worker thread printing: self id = 3552 self.task_type = 13
Worker thread printing: self id = 12084 self.task_type = 14
Worker thread printing: self id = 3980 self.task_type = 15
Worker thread printing: self id = 5296 self.task_type = 16
Worker thread printing: self id = 3552 self.task_type = 17
Worker thread printing: self id = 12084 self.task_type = 18
Worker thread printing: self id = 3980 self.task_type = 19
Main thread printing: worker id = 3980 worker task_type = 19
Main thread printing: worker id = 12084 worker task_type = 18
Main thread printing: worker id = 3552 worker task_type = 17
Main thread printing: worker id = 5296 worker task_type = 16