2

What I would like to do is to share a dictionary between subclasses of Process and when one process updates the dictionary the other is notified to use it. This is illustrated in the code below where MyProducer starts filling the dictionary and in every iteration triggers an event to notify MyConsumer to process the dictionary. Everything works apart from the part where the dictionary in MyConsumer is empty...

from multiprocessing import Process, Manager, Event

class MyProducer(Process):
    increment = 0
    def __init__(self, dictionary, event):
        Process.__init__(self)
        self.dictionary = dictionary
        self.event = event
    
    def run(self):
        while self.increment < 20:
            self.dictionary[self.increment]=self.increment+10
            self.increment = self.increment + 1
            print("From producer: ", self.dictionary)
            self.event.set()
            while self.event.is_set() is True:
                increment = self.increment
                increment = increment + 1
        
class MyConsumer(Process):
    def __init__(self, dictionary, event):
        Process.__init__(self)
        self.dictionary = dictionary
        self.event = event
        
    
    def run(self):
        while True:
            self.event.wait()
            print("From consumer: ", self.dictionary)
            self.event.clear()
            

            
if __name__ == "__main__":

    with Manager() as manager:
        state_dict = manager.dict()
        state_ready = Event()
        producerprocess = MyProducer(state_dict, state_ready)
        consumerprocess = MyConsumer(state_dict, state_ready)
        producerprocess.start()
        consumerprocess.start()    

The output is

Process MyProducer-2:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 827, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "main.py", line 13, in run
    self.dictionary[self.increment]=self.increment+10
  File "<string>", line 2, in __setitem__
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 831, in _callmethod
    self._connect()
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 818, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 502, in Client
    c = SocketClient(address)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 630, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

UPDATE

My intention is to understand why the dictionary does not work with the Process subclasses. I know all the cases that work that you can find on the internet. Actually I have a solution that works fine, just replace dict with queue, I want to understand why dict does not work.

from multiprocessing import Process, Queue, Event

class MyProducer(Process):
    increment = 0
    def __init__(self, queue, event):
        Process.__init__(self)
        self.queue = queue
        self.event = event
    
    def run(self):
        while self.increment < 20:
            self.queue.put([self.increment,self.increment+10])
            self.increment = self.increment + 1
            print("From producer: ", self.queue.qsize())
            self.event.set()
            while self.event.is_set() is True:
                increment = self.increment
                increment = increment + 1
        
class MyConsumer(Process):
    def __init__(self, queue, event):
        Process.__init__(self)
        self.queue = queue
        self.event = event
        
    def run(self):
        while True:
            self.event.wait()
            print("From consumer: ", self.queue.qsize())
            self.event.clear()
            

if __name__ == "__main__":
  state_queue = Queue()
  state_ready = Event()
  producerprocess = MyProducer(state_queue, state_ready)
  consumerprocess = MyConsumer(state_queue, state_ready)
  producerprocess.start()
  consumerprocess.start()  

dpservis
  • 31
  • 3

1 Answers1

2

FYI, I see much the same kind of death with this simpler program:

from multiprocessing import Process, Manager, Event

class MyProducer(Process):
    def __init__(self, value, event):
        Process.__init__(self)
        self.val = value
        self.event = event

    def run(self):
        print("at producer start", self.val.value)
        self.val.value = 42
        self.event.set()

class MyConsumer(Process):
    def __init__(self, value, event):
        Process.__init__(self)
        self.val = value
        self.event = event

    def run(self):
        self.event.wait()
        print("From consumer: ", self.val.value)
                        
if __name__ == "__main__":
    with Manager() as manager:
        state_value = manager.Value('i', 666)
        state_ready = Event()
        producerprocess = MyProducer(state_value, state_ready)
        consumerprocess = MyConsumer(state_value, state_ready)
        producerprocess.start()
        consumerprocess.start()

The implication is that no kind of object obtained from a Manager is usefully reconstructed when it's attached as an attribute to an object mp has to construct "by magic" in a worker process. The information needed to connect to the Manager server process appears to get lost (whether a socket on Linux-y systems or a named pipe on Windows).

You could file a bug report, but there's nothing to be done about it before then except rewrite the code not to use a Manager or to pass Manager objects explicitly to functions.

A bug report could have two kinds of resolutions: (1) make it "work"; or, (2) the code is changed to raise an exception when the attempt to create such an object is made.

Another possibility (untried): if you're only running on Linux, you could skip the __name__ == "__main__" test and hope the Manager connection info survives fork().

EDIT

I opened an issue on the Python project's tracker, here:

https://bugs.python.org/issue41660

WORKAROUND

Playing around with stuff on the Python issue report, "the problem" here doesn't appear to be an issue in how things are getting set up, but in your code ignoring the need to shut down workers cleanly. Just adding this line at the end of your code (the dict version - the one you care about):

    producerprocess.join()

is enough so that, on my box right now (Win 10 Python 3.8.5), it produces the output you expect. However, it hangs forever then, because your consumer .wait()s forever for an Event that's never set again.

My guess (which I'm 80% sure is correct): without the .join(), the main process goes on to start running interpreter shutdown code (there's nothing left for it to do!), and that starts forcibly destroying stuff the multiprocessing implementation still needs to function correctly.

With the .join(), the main process blocks until the producer is finished - no shutdown code is started for the duration, and .join() explicitly instructs the producer process to shut down its part of the (elaborate!) multiprocessing dance cleanly.

It may yet leave the consumer process in a damaged state, but, if so, we'll never see evidence of it because the consumer is blocked forever on its self.event.wait().

In a real program, you should do whatever it takes to shut down the consumer process cleanly too.

FULL CODE

Here's a complete program, showing idiomatic Python and best practices for parallel programming: everything shuts down cleanly, no "busy loops", no races, no deadlocks. The implementation of State is more elaborate than this specific problem requires, but illustrates a powerful approach well worth learning.

import multiprocessing as mp

P, C, F = 1, 2, 4 # bit flags for state values

# Unusual synchronization appears to be wanted here:
# After a producer makes a mutation, it must not make another
# before the consumer acts on it.  So we'll say we're in state
# P when the producer is allowed to mutate, and in state C
# when there's a mutation for the consumer to process.  Another
# state - F (for "finished") - tells the consumer it's time to
# quit. The producer stops on its own when it gets tired of
# mutating ;-)
class State:
    def __init__(self):
        # Initial state is empty - everyone is blocked.
        # Note that we do our own locking around the shared
        # memory, via the condition variable's mutex, so
        # it would be pure waste for the Value to have
        # its own lock too.
        self.state = mp.Value('B', 0, lock=False)
        self.changed = mp.Condition()

    # Wait for state to change to one of the states in the
    # flag mask `what`.  Return the bit flag of the state
    # that succeeded.
    def waitfor(self, what):
        with self.changed:
            while not (self.state.value & what):
                self.changed.wait()
            return self.state.value

    # Force state to (bit flag) `what`, and notify waiters
    # to wake up and see whether it's the state they're
    # waiting for.
    def setwhat(self, what):
        with self.changed:
            self.state.value = what
            self.changed.notify_all()

class Base(mp.Process):
    def __init__(self, dictionary, state):
        super().__init__()
        self.dictionary = dictionary
        self.state = state

class MyProducer(Base):
    def __init__(self, *args):
        super().__init__(*args)
        self.increment = 0

    def run(self):
        while self.increment < 20:
            self.state.waitfor(P)
            self.dictionary[self.increment] = self.increment + 10
            self.state.setwhat(C)
            # Whether the producer or the consumer prints the dict
            # first isn't forced - and, indeed, they can both print at
            # the same time, producing garbled output.  Move the
            # print() above the setwhat(C) to force the producer
            # to print first, if desired.
            print("From producer: ", self.dictionary)
            self.increment += 1

class MyConsumer(Base):
    def run(self):
        while self.state.waitfor(C | F) != F:
            print("From consumer: ", self.dictionary)
            self.state.setwhat(P)

def main():
    with mp.Manager() as manager:
        state_dict = manager.dict()
        state_state = State()
        producerprocess = MyProducer(state_dict, state_state)
        consumerprocess = MyConsumer(state_dict, state_state)
        producerprocess.start()
        consumerprocess.start()

        # The producer is blocked waiting for state P, and the
        # consumer is blocked waiting for state C (or F). The
        # loop here counts down 5 seconds, so you can verify
        # by eyeball that the waits aren't "busy" (they consume
        # essentially no CPU cycles).
        import time
        for i in reversed(range(5)):
            time.sleep(1)
            print(i)

        state_state.setwhat(P) # tell the producer to start!
        producerprocess.join() # and wait for it to finish
        # wait for the consumer to finish eating the last mutation
        state_state.waitfor(P)
        # tell the consumer we're all done
        state_state.setwhat(F)
        consumerprocess.join()    

if __name__ == "__main__":
    main()
Tim Peters
  • 67,464
  • 13
  • 126
  • 132