4

I've been looking at the following questions for the pas hour without any luck:

Python sharing a dictionary between parallel processes

multiprocessing: sharing a large read-only object between processes?

multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes

I've written a very basic test file to illustrate what I'm trying to do:

from collections import deque
from multiprocessing import Process
import numpy as np


class TestClass:
    def __init__(self):
        self.mem = deque(maxlen=4)
        self.process = Process(target=self.run)

    def run(self):
        while True:
            self.mem.append(np.array([0, 1, 2, 3, 4]))


def print_values(x):
    while True:
        print(x)


test = TestClass()
process = Process(target=print_values(test.mem))

test.process.start()
process.start()

Currently this outputs the following :

deque([], maxlen=4)

How can I access the mem value's from the main code or the process that runs "print_values"?

Kenneth Breugelmans
  • 501
  • 1
  • 8
  • 22
  • You need to read up on [*exchanging objects*](https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes) or [*sharing state between processes*](https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes). Your child processes each get a forked copy of the deque, there is no further connection between them. You probably want a [managed `Queue`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue). – Martijn Pieters Jan 25 '18 at 20:48

2 Answers2

8

Unfortunately multiprocessing.Manager() doesn't support deque but it does work with list, dict, Queue, Value and Array. A list is fairly close so I've used it in the example below..

from multiprocessing import Process, Manager, Lock
import numpy as np

class TestClass:
    def __init__(self):
        self.maxlen = 4
        self.manager = Manager()
        self.mem = self.manager.list()
        self.lock = self.manager.Lock()
        self.process = Process(target=self.run, args=(self.mem, self.lock))

    def run(self, mem, lock):
        while True:
            array = np.random.randint(0, high=10, size=5)
            with lock:
                if len(mem) >= self.maxlen:
                    mem.pop(0)
                mem.append(array)

def print_values(mem, lock):
    while True:
        with lock:
            print mem

test = TestClass()
print_process = Process(target=print_values, args=(test.mem, test.lock))
test.process.start()
print_process.start()

test.process.join()
print_process.join()

You have to be a little careful using manager objects. You can use them a lot like the objects they reference but you can't do something like... mem = mem[-4:] to truncate the values because you're changing the referenced object.

As for coding style, I might move the Manager objects outside the class or move the print_values function inside it but for an example, this works. If you move things around, just note that you can't use self.mem directly in the run method. You need to pass it in when you start the process or the fork that python does in the background will create a new instance and it won't be shared.

Hopefully this works for your situation, if not, we can try to adapt it a bit.

bivouac0
  • 2,494
  • 1
  • 13
  • 28
  • Thanks for taking the time to answer this question. I've implemented your code but I'm getting the following error: _pickle.PicklingError: Can't pickle : attribute lookup weakref on builtins failed I've tried moving the process outside of the class by using the following code, but without any luck: test.process = Process(target=test.run, args=(test.mem, test.lock)) – Kenneth Breugelmans Jan 27 '18 at 09:55
  • I've solved it using part of the code you provided and the hint @Marijn Pieters gave me. I'll post my sollution below :) – Kenneth Breugelmans Jan 27 '18 at 10:25
4

So by combining the code provided by @bivouac0 and the comment @Marijn Pieters posted, I came up with the following solution:

from multiprocessing import Process, Manager, Queue


class testClass:
    def __init__(self, maxlen=4):
        self.mem = Queue(maxsize=maxlen)
        self.process = Process(target=self.run)

    def run(self):
        i = 0

        while True:
            self.mem.empty()
            while not self.mem.full():
                self.mem.put(i)
                i += 1


def print_values(queue):
    while True:
        values = queue.get()
        print(values)


if __name__ == "__main__":
    test = testClass()
    print_process = Process(target=print_values, args=(test.mem,))

    test.process.start()
    print_process.start()

    test.process.join()
    print_process.join()
Kenneth Breugelmans
  • 501
  • 1
  • 8
  • 22