2

I have a class (MyClass) which contains a queue (self.msg_queue) of actions that need to be run and I have multiple sources of input that can add tasks to the queue.

Right now I have three functions that I want to run concurrently:

  • MyClass.get_input_from_user()
    • Creates a window in tkinter that has the user fill out information and when the user presses submit it pushes that message onto the queue.
  • MyClass.get_input_from_server()
    • Checks the server for a message, reads the message, and then puts it onto the queue. This method uses functions from MyClass's parent class.
  • MyClass.execute_next_item_on_the_queue()
    • Pops a message off of the queue and then acts upon it. It is dependent on what the message is, but each message corresponds to some method in MyClass or its parent which gets run according to a big decision tree.

Process description: After the class has joined the network, I have it spawn three threads (one for each of the above functions). Each threaded function adds items from the queue with the syntax "self.msg_queue.put(message)" and removes items from the queue with "self.msg_queue.get_nowait()".

Problem description: The issue I am having is that it seems that each thread is modifying its own queue object (they are not sharing the queue, msg_queue, of the class of which they, the functions, are all members).

I am not familiar enough with Multiprocessing to know what the important error messages are; however, it is stating that it cannot pickle a weakref object (it gives no indication of which object is the weakref object), and that within the queue.put() call the line "self._sem.acquire(block, timeout) yields a '[WinError 5] Access is denied'" error. Would it be safe to assume that this failure in the queue's reference not copying over properly?

[I am using Python 3.7.2 and the Multiprocessing package's Process and Queue]

[I have seen multiple Q/As about having threads shuttle information between classes--create a master harness that generates a queue and then pass that queue as an argument to each thread. If the functions didn't have to use other functions from MyClass I could see adapting this strategy by having those functions take in a queue and use a local variable rather than class variables.]

[I am fairly confident that this error is not the result of passing my queue to the tkinter object as my unit tests on how my GUI modifies its caller's queue work fine]

Below is a minimal reproducible example for the queue's error:

from multiprocessing import Queue
from multiprocessing import Process
import queue
import time

class MyTest:
    def __init__(self):
        self.my_q = Queue()
        self.counter = 0

    def input_function_A(self):
        while True:
            self.my_q.put(self.counter)
            self.counter = self.counter + 1
            time.sleep(0.2)

    def input_function_B(self):
        while True:
            self.counter = 0
            self.my_q.put(self.counter)
            time.sleep(1)

    def output_function(self):
        while True:
            try:
                var = self.my_q.get_nowait()
            except queue.Empty:
                var = -1
            except:
                break
            print(var)
            time.sleep(1)

    def run(self):
        process_A = Process(target=self.input_function_A)
        process_B = Process(target=self.input_function_B)
        process_C = Process(target=self.output_function)

        process_A.start()
        process_B.start()
        process_C.start()

        # without this it generates the WinError: 
        # with this it still behaves as if the two input functions do not modify the queue
        process_C.join() 

if __name__ == '__main__':
    test = MyTest()
    test.run()
jsbueno
  • 99,910
  • 10
  • 151
  • 209
rcanty
  • 54
  • 5
  • Related: [How to make built-in containers (sets, dicts, lists) thread safe?](https://stackoverflow.com/questions/13610654/how-to-make-built-in-containers-sets-dicts-lists-thread-safe). – martineau Apr 22 '20 at 17:42
  • Read [multiprocessing-vs-threading-python](https://stackoverflow.com/questions/3044580) – stovfl Apr 22 '20 at 18:18
  • @stovfl, so from this I am gathering that Process's nature in creating a new process with a new memory space supersedes a class's ability to group class variables into a common space. So to solve my issue I would need to give my functions arguments and pass a shallow clone of my class to the new Process so that it modifies the correct things? Or is the fact that Processes are trying to modify memory outside their stack what is causing the WinError 5 Access is Denied error? – rcanty Apr 22 '20 at 18:46
  • Without a [mcve], it's all wild guessing. In general, if you use a `multiprocessing.queue`, there should no ***Access is denied*** at all because a `Queue` use locking to handle concurrent access. – stovfl Apr 22 '20 at 19:12
  • @stovfl, I have added a minimal reproducible example for the error I am getting from my queue. – rcanty Apr 22 '20 at 19:31
  • ***# without this it generates the WinError***: You need `.join()` or you have to use `.daemon = True` [What exactly is Python multiprocessing Module's `.join()` Method Doing?](https://stackoverflow.com/a/25391156/7414759) – stovfl Apr 22 '20 at 20:40

1 Answers1

1

Indeed - these are not "threads" - these are "processes" - while if you were using multithreading, and not multiprocessing, the self.my_q instance would be the same object, placed at the same memory space on the computer, multiprocessing does a fork of the process, and any data in the original process (the one in execution in the "run" call) will be duplicated when it is used - so, each subprocess will see its own "Queue" instance, unrelated to the others.

The correct way to have various process share a multiprocessing.Queue object is to pass it as a parameter to the target methods. The simpler way to reorganize your code so that it works is thus:

from multiprocessing import Queue
from multiprocessing import Process
import queue
import time

class MyTest:
    def __init__(self):
        self.my_q = Queue()
        self.counter = 0

    def input_function_A(self, queue):
        while True:
            queue.put(self.counter)
            self.counter = self.counter + 1
            time.sleep(0.2)

    def input_function_B(self, queue):
        while True:
            self.counter = 0
            queue.put(self.counter)
            time.sleep(1)

    def output_function(self, queue):
        while True:
            try:
                var = queue.get_nowait()
            except queue.Empty:
                var = -1
            except:
                break
            print(var)
            time.sleep(1)

    def run(self):
        process_A = Process(target=self.input_function_A, args=(queue,))
        process_B = Process(target=self.input_function_B, args=(queue,))
        process_C = Process(target=self.output_function, args=(queue,))

        process_A.start()
        process_B.start()
        process_C.start()

        # without this it generates the WinError: 
        # with this it still behaves as if the two input functions do not modify the queue
        process_C.join() 

if __name__ == '__main__':
    test = MyTest()
    test.run()

As you can see, since your class is not actually sharing any data through the instance's attributes, this "class" design does not make much sense for your application - but for grouping the different workers in the same code block.

It would be possible to have a magic-multiprocess-class that would have some internal method to actually start the worker-methods and share the Queue instance - so if you have a lot of those in a project, there would be a lot less boilerplate.

Something along:

from multiprocessing import Queue
from multiprocessing import Process
import time


class MPWorkerBase:
    def __init__(self, *args, **kw):
        self.queue = None
        self.is_parent_process = False
        self.is_child_process = False
        self.processes = []
        # ensure this can be used as a colaborative mixin
        super().__init__(*args, **kw)

    def run(self):
        if self.is_parent_process or self.is_child_process:
            # workers already initialized
            return

        self.queue = Queue()
        processes = []

        cls = self.__class__
        for name in dir(cls):
            method = getattr(cls, name)
            if callable(method) and getattr(method, "_MP_worker", False):
                process = Process(target=self._start_worker, args=(self.queue, name))
                self.processes.append(process)
                process.start()
        # Setting these attributes here ensure the child processes have the initial values for them.
        self.is_parent_process = True
        self.processes = processes

    def _start_worker(self, queue, method_name):

        # this method is called in a new spawned process - attribute
        # changes here no longer reflect attributes on the
        # object in the initial process

        # overwrite queue in this process with the queue object sent over the wire:
        self.queue = queue
        self.is_child_process = True
        # call the worker method
        getattr(self, method_name)()

    def __del__(self):
        for process in self.processes:
            process.join()


def worker(func):
    """decorator to mark a method as a worker that should
    run in its own subprocess
    """

    func._MP_worker = True
    return func


class MyTest(MPWorkerBase):
    def __init__(self):
        super().__init__()
        self.counter = 0

    @worker
    def input_function_A(self):
        while True:
            self.queue.put(self.counter)
            self.counter = self.counter + 1
            time.sleep(0.2)

    @worker
    def input_function_B(self):
        while True:
            self.counter = 0
            self.queue.put(self.counter)
            time.sleep(1)

    @worker
    def output_function(self):
        while True:
            try:
                var = self.queue.get_nowait()
            except queue.Empty:
                var = -1
            except:
                break
            print(var)
            time.sleep(1)


if __name__ == '__main__':
    test = MyTest()
    test.run()
jsbueno
  • 99,910
  • 10
  • 151
  • 209
  • Thank you for this feedback. I had used this architecture because I have many computers that would each be running a copy of this program and communicating (the server part). I put this threaded I/O function into my Client class so that its children would only need to define some new attributes and their specific decision tree. I had wanted the two I/O functions to all be able to call upon a common set of functions which modify the attributes of their parent class (self.queue, self.passwords, self.read_message(), etc.). From what you said, I want to be multi-threading not multiprocessing. – rcanty Apr 22 '20 at 20:06
  • 1
    I added a mixin class that would remove the boilerplate for distributing the "self.queue" - as for multithreading, you mentiioned you using a tkinter interface - you have to take some care there: everythng tkinter-related have to run on the main-thread, and you should use the `tkinter.after` method to keep calling your methods to check interface values, never a `while True: ...; time.sleep()` pattern . (https://effbot.org/tkinterbook/widget.htm) – jsbueno Apr 22 '20 at 20:22
  • Thank you, this is making things seem a bit clearer. I definitely have more research to do in this area (on both the concurrent I/O and the tkinter front) so I do appreciate your help. Switching to a thread (over a process) architecture has already helped tremendously. – rcanty Apr 22 '20 at 21:27
  • 1
    In a thread, the instance attributes are in fact shared - (so, all workers will see the same `self.counter` )in multiprocessing, each worker has its own counter – jsbueno Apr 22 '20 at 21:40