-1

what do i have: queue made like best answer on python Pool with worker Processes

so it looks like

from multiprocessing import Worker, Queue


class Worker(Process):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue
        self.task_type = ''
    
    def get_type():
        print(self.task_type)  # prints empty line
        return self.task_type

    def run(self):
        print('Worker started')
        # do some initialization here

        print('Computing things!')
        for data in iter(self.queue.get, None):
            self.task_type = data['type']
            print(self.task_type)  # prints test
            # Use data

request_queue = Queue()
workers = []
for i in range(4):
    workers.append(Worker(request_queue))

for i in workers:
    i.start()

for data in the_real_source:
    request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
    request_queue.put(some_stuff)

then im doing some stuff to find the worker which has type i need and terminate it like

for i in workers:
    if i.get_type() == 'test':
        i.terminate()

but if we are trying to get all of types in all workers, they're all empty even while task inside is running

and i don't know how to fix it

Lionead
  • 21
  • 5

2 Answers2

0

You did not post anything close to a minimal, reproducible example since you were missing important declarations, such as the_real_source. So I can't fix and test your code but can give you slightly altered code that you can adapt. Here are several areas where you went wrong:

  1. Your are importing from multiprocessing the name Worker. But Worker does not exist in that package; it is a class that you are supplying in your own code. You do, however, need to import from that package the Process class.
  2. The statement for data in iter(self.queue.get, None): says to iterate the input queue by making calls to self.queue.get() until the value None is returned. Therefore, None is the sentinel value that is being used to signal that there is no more data left to be processed.
  3. Since you have 4 processes reading from the same input queue and you want all 4 processes to terminate when there is no more data, you need to place 4 instances of the sentinel value None on the queue.
  4. And since all 4 processes are reading from the same input queue and you have no control over the scheduling of the running of the processes, you can't be sure of which process will be reading which data from the queue. The only exception is that you can be sure that each process will read one sentinel value each because when a process reads a sentinel value it immediately breaks out of the loop where it is getting data from the queue and then terminates.

In the following code the processes return back to the main process their final task_type attribute value in an output queue that is passed as an additional initializer to the Worker instance:

from multiprocessing import Process, Queue

class Worker(Process):
    def __init__(self, request_queue, output_queue):
        super(Worker, self).__init__()
        self.request_queue = request_queue
        self.output_queue = output_queue
        self.task_type = ''

    def run(self):
        print(f'Worker {self.pid} started')
        # do some initialization here

        print('Computing things!')
        for data in iter(self.request_queue.get, None):
            self.task_type = data['type']
            # Print our ident:
            print('Worker process printing:', 'self id =', self.pid, 'self.task_type =', self.task_type)  # prints test
            # Use data:
        self.output_queue.put((self.pid, self.task_type))


if __name__ == '__main__':
    request_queue = Queue()
    output_queue = Queue()
    workers = []
    for i in range(4):
        workers.append(Worker(request_queue, output_queue))

    for worker in workers:
        worker.start()

    the_real_source = [{'type': i} for i in range(20)]
    for data in the_real_source:
        request_queue.put(data)

    for _ in range(4):
        # put sentinel for clean shutdown, which is None:
        request_queue.put(None)

    # Must read the output queue before joining tasks:
    # We are looking for 4 results:
    results = [output_queue.get() for _ in range(4)]
    for pid, task_type in results:
        print('Main process printing:', 'worker id =', pid, 'worker task_type =', task_type)
    # wait for the tasks to complete:
    for worker in workers:
        worker.join()

Prints:

Worker 16672 started
Computing things!
Worker process printing: self id = 16672 self.task_type = 0
Worker process printing: self id = 16672 self.task_type = 1
Worker process printing: self id = 16672 self.task_type = 2
Worker process printing: self id = 16672 self.task_type = 3
Worker process printing: self id = 16672 self.task_type = 4
Worker process printing: self id = 16672 self.task_type = 5
Worker process printing: self id = 16672 self.task_type = 6
Worker process printing: self id = 16672 self.task_type = 7
Worker process printing: self id = 16672 self.task_type = 8
Worker process printing: self id = 16672 self.task_type = 9
Worker process printing: self id = 16672 self.task_type = 10
Worker process printing: self id = 16672 self.task_type = 11
Worker process printing: self id = 16672 self.task_type = 12
Worker 19620 started
Worker process printing: self id = 16672 self.task_type = 13
Computing things!
Worker process printing: self id = 16672 self.task_type = 14
Worker process printing: self id = 19620 self.task_type = 15
Worker 6728 started
Worker process printing: self id = 16672 self.task_type = 16
Worker process printing: self id = 19620 self.task_type = 17
Computing things!
Worker 17724 started
Worker process printing: self id = 16672 self.task_type = 18
Worker process printing: self id = 19620 self.task_type = 19
Computing things!
Main process printing: worker id = 6728 worker task_type =
Main process printing: worker id = 16672 worker task_type = 18
Main process printing: worker id = 19620 worker task_type = 19
Main process printing: worker id = 17724 worker task_type =

In the above run all of the input messages were grabbed by 2 of the 4 processes.

Multithreading Version

You need to use multithreading where your worker threads share the same address space as your main thread if you want to have the main thread be able to access the task_type attribute of a Worker instance at the same time the worker thread is still running and get the current value. Otherwise, if you are using multiprocessing, the best you can do is have the final value returned back to the main process in a passed output queue (@Aaron used the output queue to pass back the Worker instance in its entirety, but this may be more information than you needed as the multiprocessing version above shows). In the code below, however, the main thread, is only accessing the task_type attributes as the threads complete, but it could have been done at any time:

from threading import Thread
from queue import Queue

class Worker(Thread):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue
        self.task_type = ''

    def run(self):
        print(f'Worker {self.ident} started')
        # do some initialization here

        print('Computing things!')
        for data in iter(self.queue.get, None):
            self.task_type = data['type']
            # Print our ident:
            print('Worker thread printing:', 'self id =', self.ident, 'self.task_type =', self.task_type)  # prints test
            # Use data


if __name__ == '__main__':
    request_queue = Queue()
    workers = []
    for i in range(4):
        workers.append(Worker(request_queue))

    for worker in workers:
        worker.start()

    the_real_source = [{'type': i} for i in range(20)]
    for data in the_real_source:
        request_queue.put(data)

    for _ in range(4):
        # put sentinel for clean shutdown, which is None:
        request_queue.put(None)

    # wait for the tasks to complete:
    for worker in workers:
        worker.join()
        print('Main thread printing:', 'worker id =', worker.ident, 'worker task_type =', worker.task_type)

Prints:

Worker 3980 started
Computing things!
Worker 12084 started
Computing things!
Worker 3552 started
Computing things!
Worker 5296 started
Computing things!
Worker thread printing: self id = 3980 self.task_type = 0
Worker thread printing: self id = 12084 self.task_type = 2
Worker thread printing: self id = 3552 self.task_type = 1
Worker thread printing: self id = 3552 self.task_type = 6
Worker thread printing: self id = 3552 self.task_type = 7
Worker thread printing: self id = 3980 self.task_type = 4
Worker thread printing: self id = 12084 self.task_type = 5
Worker thread printing: self id = 3980 self.task_type = 8
Worker thread printing: self id = 5296 self.task_type = 3
Worker thread printing: self id = 3552 self.task_type = 9
Worker thread printing: self id = 12084 self.task_type = 10
Worker thread printing: self id = 3980 self.task_type = 11
Worker thread printing: self id = 5296 self.task_type = 12
Worker thread printing: self id = 3552 self.task_type = 13
Worker thread printing: self id = 12084 self.task_type = 14
Worker thread printing: self id = 3980 self.task_type = 15
Worker thread printing: self id = 5296 self.task_type = 16
Worker thread printing: self id = 3552 self.task_type = 17
Worker thread printing: self id = 12084 self.task_type = 18
Worker thread printing: self id = 3980 self.task_type = 19
Main thread printing: worker id = 3980 worker task_type = 19
Main thread printing: worker id = 12084 worker task_type = 18
Main thread printing: worker id = 3552 worker task_type = 17
Main thread printing: worker id = 5296 worker task_type = 16
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • hi, thank you for improving the code, was writing it just to show what i have, but sadly didn’t test it. can you show the right code which will share self.task_type with main out of worker? or there’s solution in one of these codes – Lionead Oct 13 '21 at 19:53
  • I am not sure what you mean by "share self.task_type with main out of worker." What I wanted to demonstrate is that you could be theoretically starting 4 processes but one process could be grabbing *all* the messages from the input queue and successively assigning something to `self.task_type` overlaying the previous value and the remaining 3 processes never get to assign anything leaving `self.task_type` as ''. Note how the process with Process Id 17948 grabbed messages 0, 1, 2, and 3 (the first 4 messages). So it is not clear to me what your problem definition is. (more...) – Booboo Oct 13 '21 at 20:39
  • If you want your processes to compute some function of the input and return that computed value, then you pass as a second argument to each process a second output queue to which the result is written. Of course, because you have no control over the order in which the processes write their results, you need to figure out how to match up the result with the original input. Either the original input is returned with the result as a tuple or an index is written with the data when it is put on the input queue and that index is written with the result as a tuple and then all the tuples are sorted. – Booboo Oct 13 '21 at 20:44
  • You need to state a real problem and then there exists a real coded solution. But I can't code something guessing what it is you are trying to accomplish. – Booboo Oct 13 '21 at 20:47
  • ok, then i'll try explaining other way: while workers are running, i want to get their currently running task type, but main script does not see anything, see Aaron explanation, and i want to get working example of same thing made with threading (it is not possible with multiprocessing ig) – Lionead Oct 14 '21 at 04:43
  • I have updated the multiprocessing version to show how to return the `task_type` attribute from each process in a different way from @Aaron (you don't need to return the complete object) and I have added a multithreading version if you need the main thread to be able to access the `Worker.task_type` attribute at any tine. – Booboo Oct 14 '21 at 11:05
  • thanks for this one, but will it work if i'm trying to get task type while some tasks may be running? without waiting for them to end – Lionead Oct 14 '21 at 11:33
  • Yes. You can access, for example, `workers[0].task_type` at any time after you have created the `Worker` instance and appended it to the `workers` list. – Booboo Oct 14 '21 at 11:52
  • But multithreading and multiprocessing are *not* interchangeable. Multithreading has performance limitations due to contention for the Global Interpreter Lock. You need to google this subject. Multithreading is suitable if your `run` method has very little CPU processing and is either mostly I/O or waiting on network requests or uses C-language library functions that release the Global Interpreter Lock (GIL). – Booboo Oct 14 '21 at 11:57
  • thanks, that is exactly what i wanted. will let you know if it works in my main project – Lionead Oct 14 '21 at 15:12
  • alright, it works for me, but i cannot stop the thread, unlike process (p.terminate()). is there a way to do it properly with current code? i have lots of different task types in project, i don't think i can add event of stopping – Lionead Oct 14 '21 at 18:57
  • k, i made task cancelling, but now asyncio.gather inside of it doesn’t work properly. instead of running 150 async tasks it runs only one task, others don’t get ran i guess. is it because of threading? – Lionead Oct 14 '21 at 19:20
  • First. to terminate threads requires cooperation on the threads part by periodically checking a flag such as `stop` that when set to `True` the thread should gracefully end. Second, it is now time for you to post a whole new `asyncio` question with a [Minimal, Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example) that duplicates the problem you are describing. You cannot keep on tacking on new questions to an existing post when these questions pertain to a completely different scenario. When you have done so, you can reply to this comment with a link to the post. – Booboo Oct 14 '21 at 20:11
0

When you call Worker.start(), a copy of the object is made and sent to the child process for execution. At this point you have two separate objects, and updating self.task_type in the child process will not update self.task_type in the main process. One of the primary tenants of Processes vs Threads is that Processes don't share memory space, while Threads do. Any communication between processes must be via OS file handles (which are at the heart of all the multiprocessing shared value types).

Quick and dirty example of getting modified versions of worker instances back in main process (untested: may have typos..)

class Worker(Process):
    def __init__(self, in_queue, out_queue):
        super(Worker, self).__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.task_type = ''
    
    def get_type():
        print(self.task_type)  # prints empty line
        return self.task_type

    def run(self):
        print('Worker started')
        # do some initialization here

        print('Computing things!')
        for data in iter(self.in_queue.get, None):
            self.task_type = data['type']
            print(self.task_type)  # prints test
            # Use data
        #after stop sentinel
        self.out_queue.put(self) #send modified Worker instance back to main process

if __name__ == "__main__": #you should always be using this with multiprocessing...
    request_queue = Queue()
    response_queue = Queue()
    workers = []
    for i in range(4):
        workers.append(Worker(request_queue, response_queue))

    for i in workers:
        i.start()

    for data in the_real_source:
        request_queue.put(data)
    # Sentinel objects to allow clean shutdown: 1 per worker.
    for i in range(4):
        request_queue.put(None) #stop sentinel

    workers = [response_queue.get() for _ in range(4)]

    for i in workers:
        if i.get_type() == 'test':
            i.terminate()
Aaron
  • 10,133
  • 1
  • 24
  • 40
  • hello, that’s good explanation, but how can i make it into a code? you can use example in answer by booboo – Lionead Oct 13 '21 at 19:52
  • A simple, but potentially inefficient method would be to pass new copies of the modified `Worker` instances back via another queue to the main process. Those would then reflect any changes made in the child.. I'll write up a simple example – Aaron Oct 13 '21 at 19:58
  • that’s good wxample, but is there any way to get this task type while workers may be running without stopping them? i have multiple workers in my bot, and i want to check if any of them are running without interrupting someone’s task – Lionead Oct 14 '21 at 04:45
  • entirely possible, but it may be best to only send the relevant information in that case rather than the entire `worker` object (keep in mind sending it back will only snapshot the value at that time. It will need to be re-sent every time you want an update). I would still use a queue to send updates to the relevant values. Passing messages is often a better method than using shared values. – Aaron Oct 14 '21 at 13:02