0

I'm interested in using multi-processing to over-write a class member value and the do that repeatedly multiple times. In order to do then be able to write this to a file, I'm using queues as suggested in another SO answer. As a minimum reproducible example, consider the following:

import multiprocessing

N_CORES = 4

class FooPool(object):
    def __init__(self):
        self.l = 10 * [0]

    def fn_populate(self):
        self.l = [i for i in range(len(self.l))]

    def fn_listen(self, q): 
        while True:
            s = q.get()
            if s[0] == 'kll': break
            i, x = s
            self.l[i] = x
            with open('bar.txt', 'w') as fl: fl.write(f'{self.l}\n')

    def fn_exec(self, i, q): 
        x = i ** 2
        q.put([i, x])

    def mp_exec(self, n_cores):

        mp_manager = multiprocessing.Manager()
        q = mp_manager.Queue()

        with multiprocessing.Pool(processes=n_cores) as pool:
            _ = pool.apply_async(self.fn_listen, (q,))

            jobs = []
            for i in range(10):
                job = pool.apply_async(self.fn_exec, (i, q))
                jobs.append(job)

            for job in jobs: 
                job.get()

            q.put(['kll', -1])
            pool.close()
            pool.join()

if __name__ == "__main__":
    p = FooPool()
    for _ in range(2): p.mp_exec(N_CORES)

Each index position of list self.l is squared, copied to that same position in the list and then self.l is written to a file.

Now, let's say I want to do this more than once; so in this example, what I would hope to see in my text-file is

[0, 1, 16, 81, 256, 625, 1296, 2401, 4096, 6561]

instead it contains

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Further, p.l remains a list of zeroes. Now, normally this list object of class is mutable (if that is even the right way of thinking about this, please correct me if I misunderstand). For example, calling p.fn_populate() will "overwrite" the list, but it looks like every call to p.mp_exec creates a new instance. This is confirmed in this other thread, but no solution is suggested.

How can I overcome this behaviour? For my specific case, I tried creating an object that will store all data that must persist, but this creates a huge overhead, and as a result the multiprocessing implementation seems to run slower than sequential execution.

1 Answers1

0

Based on your comments and the fact that you call p.mp_exec() twice, its seems that you expect the elements in self.l to be squared twice. But your code in fn_exec() doesn't do anything that would cause double-squaring to occur: it just pushes a fresh [i, x] into the queue. Double-squaring would require that you first try to read an already-squared element from self.l.

But even that wouldn't be enough to get what you seem to want. When you use multiprocessing, each process gets its own independent memory. You can read a lot more about what happens when multiple processes are spawned, but in very general terms when the pool is created each process starts from a cloned copy of the parent process. In your program, every process is starting out with all values in self.l at zero. Once the individual processes start doing stuff, the changes they make to their own memory space (eg, changes to self.l) will not be reflected in the other processes. Think about how chaotic the world would get without such independence.

More generally, the usage of multiprocessing in your program is non-standard in the sense that none of the program's work would benefit from distributing the work across multiple processes. Filling the queue with work items does not need to be distributed among various processes: that task is nearly instantaneous. The only task in the entire program that takes any time at all is the listener doing some file-IO. But you only have one of those (for good reason).

Your current usage is also non-standard in spinning up a multiprocessing.Pool twice. I'm not certain I understand the intent behind this or when it would ever be necessary.

Here's a more typical model for a multiprocessing program:

  • Fill up a Queue with work items. In most such programs I have written, filling the queue is very fast and can be done at the outset, directly by the parent process. At the end of the work queue, add enough poison-pills so that every spawned process will be killed at the end.

  • If needed, create a second Queue where workers will put their finished work.

  • Create a Pool of workers such that they will have access to the needed queue(s).

  • Start everything up and wait for the tasks to complete. Whenever possible, just let the pool figure out when all work is done (ie, use one of the flavors of map) rather than worrying about it yourself (ie, use an apply flavor only if you must).

  • The worker code itself will consist of a while True loop: get item from work queue; if poison-pill, break; otherwise, do an item of work and push a result to the output queue.

  • Process the stuff in the results queue. This is typically done in the parent process after the child processes have completed.

Occasionally there is a need for more sophistication in how the work items are generated, in whether you need different types or workers, or in how results are collected, but those are needed less frequently. My advice would be for you to make sure you are completely fluent in using and understanding the basic model for a multiprocessing program before experimenting with more complex scenarios.

FMc
  • 41,963
  • 13
  • 79
  • 132