8

Context:

  • A Python application server that uses a concurrent.futures.process.ProcessPool to execute code
  • We sometimes want to hot reload imported code without restarting the entire server process

(yes I know importlib.reload has caveats)

To get this to work I imagine I would have to execute the importlib.reload in every multiprocessing process that is managed by the process pool.

Is there a way to submit something to all processes in a process pool?

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
codeape
  • 97,830
  • 24
  • 159
  • 188

1 Answers1

5

I don't know how this will play out with the hot reloading attempt you mentioned, but the general question you really asked is answerable.

Is there a way to submit something to all processes in a process pool?

The challenge here lies in assuring that really all processes get this something once and only once and no further execution takes place until every process got it.

You can get this type of necessary synchronization with help of a multiprocessing.Barrier(parties[, action[, timeout]]). The barrier will hold back parties calling barrier.wait() until every party has done so and then release them all at once.

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor


def foo(x):
    for _ in range(int(42e4)):
        pass
    return x


def reload(something):
    print(f"{mp.current_process().name} --- reloading {something} and waiting.")
    barrier.wait()
    print(f"{mp.current_process().name} --- released.")


def init_barrier(barrier):
    globals()['barrier'] = barrier


if __name__ == '__main__':

    MAX_WORKERS = 4
    barrier = mp.Barrier(MAX_WORKERS)

    with ProcessPoolExecutor(
            MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
    ) as executor:
        print(list(executor.map(foo, range(10))))
        # then something for all processes
        futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
        for f in futures:
            f.result()

        print(list(executor.map(foo, range(10))))

Example Output:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ForkProcess-3 --- reloading something and waiting.
ForkProcess-2 --- reloading something and waiting.
ForkProcess-1 --- reloading something and waiting.
ForkProcess-4 --- reloading something and waiting.
ForkProcess-1 --- released.
ForkProcess-4 --- released.
ForkProcess-3 --- released.
ForkProcess-2 --- released.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Process finished with exit code 0

If you are okay with keeping barrier a global and multiprocessing.get_context()._name returns "fork", you don't need to use the initializer because globals will be inherited and accessible through forking.

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • In your example you have already waited for all outstanding tasks (submitted to `foo`) to complete, which you should do before submitting to `reload`, and now you are waiting for all task submitted to `reload` to complete, which you should do, before submitting further tasks (to `foo`). So it's not clear to me what purpose here the barrier serves. – Booboo Oct 25 '20 at 13:32
  • @Booboo The barrier assures that really all processes get the `reload` by preventing a process to get the same task more than once. Without the barrier you don't know how many processes really end up executing the task (depends on OS-scheduling and how long the task takes). – Darkonaut Oct 25 '20 at 13:42
  • So I think the point I was missing is that you are saying that `Barrier(MAX_WORKERS)` requires that MAX_WORKERS *separate* processes must make a call to `wait` before `wait` returns thus guaranteeing each process does the reload. I thought `wait` returned immediately with a count. Pretty silly. – Booboo Oct 25 '20 at 13:51
  • 1
    @Booboo Well yeah, a process/worker is always separate. OP is essentially asking how to run an `initializer` multiple times after the processes have already started. Comment the `barrier.wait()` out and run it a couple of times. You'll eventually get to see an output where a worker doesn't reload because another did the task twice. – Darkonaut Oct 25 '20 at 13:56