4

I expected that if I'd call apply_async in an instance method and get its result, that any changes made would stay part of the forked processes. However, it seems that every new call to apply_async creates a new copy of said instance.

Take the following code:

from multiprocessing.pool import Pool


class Multitest:
    def __init__(self):
        self.i = 0

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(10):
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        print("i", self.i)
        self.i += 1

        return inp

if __name__ == '__main__':
    mt = Multitest()
    mt.run()

Sample output:

i 0
i 0
i 0
i 0
i 0
input 0
i 0
i 0
i 0
i 0
i 0
input 1
input 2
input 3
input 4
input 5
input 6
input 7
input 8
input 9

But since we have two cores over which 10 inputs are spread, I had expected the i properties to be incremented.

I had expected the following flow:

  • main thread creates instance and calls run()
  • main thread distributes work of apply_async over pool by initializing two new processes and a copy of the original Multitest instance (where i = 0)
  • process() is called on the new processes a number of times (until range()is exhausted). On each call to process, self.i for that process is incremented

Note: I am not asking after the shared state between two processes. Instead I am asking why the class instance of a single process is not mutated (why isn't each individual process's self.i not incremented).

However, I do not see this behaviour. Instead, the printed output is only zeroes, indicating that my expectations were wrong: the state (property i) is not maintained, but a new instance (or at least a new copy) is created on every call to apply_async. What am I missing here, and how can I make this work as expected? (Preferably with apply_async, though not required. The order of the results should be maintained, though.)

As far as I can tell this behaviour is not specific to apply_async but also to the other pool methods. I am interested to learn why this happens and how the behaviour can be changed to the behaviour I want to achieve. Bounty goes to the answer that can provide an answer to both queries.

Bram Vanroy
  • 27,032
  • 24
  • 137
  • 239
  • in general, are you aware of how multi-processing works in Python? more importantly how each new process is a fork of the parent process with it's own _copy_ of the state, not a shared state – gold_cy May 15 '20 at 13:06
  • @gold_cy I am, but this is different from that. I am not asking about shared states between the processes, but whether one class instance of the same process does not stay the same (has the same, modified, attribute). – Bram Vanroy May 15 '20 at 13:08
  • In multiprocessing, arguments are pickled, transfer to another process, and unpickled. what a function receive is a copy of the arguments when `async_apply` is called. To sync states between processes, try managers like `multiprocessing.SyncManager` or create your own manager. Or create proxy objects `multiprocessing.managers.BaseProxy`. After all, you may prefer update the instance according to the result. :) – Aaron May 21 '20 at 21:55
  • @Aaron Please re-read the post and especially the note. This question is not about sharing between processes. – Bram Vanroy May 22 '20 at 07:28
  • 2
    @BramVanroy The first two sentences have answered the why. To achieve the behavior you mention, consider creating a process-local storage in analogy to `threading.local`. Use a module to store the process state because a module is indeed process-local. The parameter `initializer` of `multiprocess.Pool` may be helpful. – Aaron May 22 '20 at 10:28

3 Answers3

6

I'd like to point you to references, but I don't have any yet, so I'll share my thoughts based on empirical evidence:

Each call to apply_async prepares a fresh copy of the namespace. You can see this by adding a call to print(self) inside of process. So this part is not true:

main thread distributes work ... by initializing two new processes and a copy of the original Multitest instance

Rather, there are two new processes and ten copies of the original Multitest instance. All those copies are made from the main process, which hasn't had its copy of i incremented. To demonstrate that, add time.sleep(1); self.i += 1 before the call to apply_async, and notice that a) the value of i in the main thread is incremented, and b) by delaying the for loop, the original Multitest instance has changed by the time the next call to apply_async triggers a new copy.

Code:

from multiprocessing.pool import Pool
import time

class Multitest:
    def __init__(self):
        print("Creating new Multitest instance: {}".format(self))
        self.i = 0

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(4):
                time.sleep(1); self.i += 1
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        print("i", self.i)
        print("Copied instance: {}".format(self))
        self.i += 1

        return inp

if __name__ == '__main__':
    mt = Multitest()
    mt.run()

Result:

Creating new Multitest instance: <__main__.Multitest object at 0x1056fc8b0>
i 1
Copied instance: <__mp_main__.Multitest object at 0x101052d90>
i 2
Copied instance: <__mp_main__.Multitest object at 0x101052df0>
i 3
Copied instance: <__mp_main__.Multitest object at 0x101052d90>
input 0
input 1
input 2
i 4
Copied instance: <__mp_main__.Multitest object at 0x101052df0>
input 3

As to your second query, I think if you want state to be maintained within a process, you probably need to only submit one job. Instead of Pool(2) handling 10 independent jobs, you'd have Pool(2) handling 2 independent jobs, each of which consists of 5 interdependent sub-jobs. Alternatively, if you really want 10 jobs, you could use a shared data structure indexed by pid, such that all the jobs operating (in sequence) within a single process can manipulate a single copy of i.

Here's an example with a shared data structure, in the form of a global in a module:

from multiprocessing.pool import Pool
from collections import defaultdict
import os
import myglobals # (empty .py file)

myglobals.i = defaultdict(lambda:0)

class Multitest:
    def __init__(self):
        pid = os.getpid()
        print("Creating new Multitest instance: {}".format(self))
        print("i {} (pid: {})".format(myglobals.i[pid], pid))

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(4):
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        pid = os.getpid()
        print("Copied instance: {}".format(self))
        print("i {} (pid: {})".format(myglobals.i[pid], pid))
        myglobals.i[pid] += 1

        return inp

if __name__ == '__main__':
    mt = Multitest()
    mt.run()

Result:

Creating new Multitest instance: <__main__.Multitest object at 0x1083f3880>
i 0 (pid: 3460)
Copied instance: <__mp_main__.Multitest object at 0x10d89cdf0>
i 0 (pid: 3463)
Copied instance: <__mp_main__.Multitest object at 0x10d89ce50>
Copied instance: <__mp_main__.Multitest object at 0x10550adf0>
i 0 (pid: 3462)
Copied instance: <__mp_main__.Multitest object at 0x10550ae50>
i 1 (pid: 3462)
i 1 (pid: 3463)
input 0
input 1
input 2
input 3

This technique is from https://stackoverflow.com/a/1676328/361691

Nick Russo
  • 1,522
  • 10
  • 13
1

I believe the following is happening:

  1. Each time self.process is called, the method is serialized (pickled) and sent to the child process. A new copy is created each time.
  2. The method runs in the child process, but since it is part of a separate copy, different from the original in the parent process, its changed state does not and cannot affect the parent process. The only information that is passed back is the return value (also pickled).

Note that the child processes do not have their own instance of Multitest, because that is only created when __name__ == '__main__' which does not apply for the forks created by the pool.

If you want to maintain state in the child process, you can do it with global variables. You can pass an initializer argument when you create a pool to initialize such variables.

The following shows a working version of what you intended (but without OOP, which doesn't work well with multiprocessing):

from multiprocessing.pool import Pool


def initialize():
    global I
    I = 0


def process(inp):
    global I
    print("I", I)
    I += 1
    return inp


if __name__ == '__main__':
    with Pool(2, initializer=initialize) as pool:
        worker_jobs = []
        for j in range(10):
            job = pool.apply_async(process, (j,))
            worker_jobs.append(job)

        for job in worker_jobs:
            res = job.get()
            print("input", res)
Andreas
  • 550
  • 3
  • 12
0

One difference between multiprocessing and threading is, after a process is created, the memory which it use is virtually cloned form it's parent process, so there is no shared memory between processes.

Here is an example:

import os
import time
from threading import Thread

global_counter = 0

def my_thread():
    global global_counter
    print("in thread, global_counter is %r, add one." % global_counter)
    global_counter += 1

def test_thread():
    global global_counter
    th = Thread(target=my_thread)
    th.start()
    th.join()
    print("in parent, child thread joined, global_counter is %r now." % global_counter)

def test_fork():
    global global_counter
    pid = os.fork()
    if pid == 0:
        print("in child process, global_counter is %r, add one." % global_counter)
        global_counter += 1
        exit()
    time.sleep(1)
    print("in parent, child process died, global_counter is still %r." % global_counter)

def main():
    test_thread()
    test_fork()

if __name__ == "__main__":
    main()

Output:

in thread, global_counter is 0, add one.
in parent, child thread joined, global_counter is 1 now.
in child process, global_counter is 1, add one.
in parent, child process died, global_counter is still 1.

In your case:

for j in range(10):
    # Before fork, self.i is 0, fork() dups memory, so the variable is not shared to the child.
    job = pool.apply_async(self.process, (j,))
    # After job finishes, child's self.i is 1 (not parent's), this variable is freed after child dies.
    worker_jobs.append(job)

Edit:

In python3 pickling an bound method would include the object itself as well, essentially duplicate it. Therefore everytime apply_async is called, object self gets pickled as well.

import os
from multiprocessing.pool import Pool
import pickle

class Multitest:
    def __init__(self):
        self.i = "myattr"

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(10):
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        print("i", self.i)
        self.i += "|append"

        return inp

def test_pickle():
    m = Multitest()
    print("original instance is %r" % m)

    pickled_method = pickle.dumps(m.process)
    assert b"myattr" in pickled_method

    unpickled_method = pickle.loads(pickled_method)
    # get instance from it's method (python 3)
    print("pickle duplicates the instance, new instance is %r" % unpickled_method.__self__)

if __name__ == '__main__':
    test_pickle()

Output:

original instance is <__main__.Multitest object at 0x1072828d0>
pickle duplicates the instance, new instance is <__main__.Multitest object at 0x107283110>
Kamoo
  • 832
  • 4
  • 11
  • Sorry, this does not answer the question. The question is not about a shared state between processes, but the supposedly same instance in the same process whose properties are not updated. See the comment the OP and the other answer. – Bram Vanroy May 21 '20 at 09:39
  • It should be incremented by one for each processes (0 -> 1): – Kamoo May 21 '20 at 09:50
  • Please re-read my OP and try the code snippet and you'll see what I mean. – Bram Vanroy May 21 '20 at 09:51
  • If you mean why the output `i` are `0`s instead of `1`s, the print statement is before assignment, just to make it clear. What I understand is you were expecting the outputs `i`s to be ranged from 0 - 9, is that correct? – Kamoo May 21 '20 at 09:58
  • Correct. Or, well, at least there should be continuous incrementing of the variable (it might be that one process is a bit faster than the other so it is not guaranteed that both range 0-9). – Bram Vanroy May 21 '20 at 11:29
  • You were excepting what would manifest in threading, but in multiprocessing, after the moment a process is created (forked), it's more like running a completely new program (with it's memory and execution state copied from it's parent's), rather than two threads working on one same program.. Memory changes happend in the child process won't get reflected in parent. – Kamoo May 21 '20 at 12:04
  • If you want to share variables or complexed object among multiple processes, you could use `multiprocessing.Value` (shared memory) or create a `multiprocessing.Manager` (RPC server). – Kamoo May 21 '20 at 12:10
  • _Again_, this not about sharing the state between processes... I expected each process to increment it's variable so for range(10), one process would go up to 5 and the other too (depending on each process's speed). However, instead you see that it does not increment - it always stays the same for both processes, independently of each other. – Bram Vanroy May 21 '20 at 13:25