4

(Python 3.4, Linux).

I have a main process 'P', which forks 8 processes ('C1' through 'C8'). I want to create multiprocessing.Barrier that ensures all the 8 child processes are in sync at a certain point.

Everything works fine if I define the synchronization primitive in the parent process, so that when I fork the child processes it is properly inherited:

import multiprocessing as mp
barrier = mp.Barrier(8)

def f():
  # do something
  barrier.wait()
  # do more stuff

def main():
  for i in range(8):
    p = mp.Process(target = f)
    p.start()

if __name__ == '__main__':
  main()

But in my case, I do not know the details required to create the Barrier object until after the child processes start (I don't know the argument I want to pass as its action parameter). Therefore, I want to create Barrier in one of the child processes but I don't know how to make it available to the other child processes. The following won't work of course because the 8 Barrier objects in the child process are completely independent from each other:

import multiprocessing as mp

def f():
  global barrier
  # do something
  barrier = mp.Barrier(8)
  barrier.wait()
  # do more stuff

def main():
  for i in range(8):
    p = mp.Process(target = f)
    p.start()

if __name__ == '__main__':
  main()

I was thinking to create barrier in one of the child processes and pass it to the others using multiprocessing.Queue (or if Queue doesn't accept Barrier objects, using multiprocessing.Manager().Barrier). However, even if this works, I don't know how to ensure only one process actually puts the (7 copies of) synchronization primitives onto the queue, while the others will only get them. (Of course, I can create a yet another synchronization primitive in the parent process just to do that, but then I might as well refactor my code to create the original Barrier in the parent process after all.)

dano
  • 91,354
  • 19
  • 222
  • 219
max
  • 49,282
  • 56
  • 208
  • 355
  • *"but then I might as well refactor my code to create the original Barrier in the parent process after all."* This is probably the best choice here. Trying to pass a `Manager` from the parent into the child processes won't work properly (you'll get weird exceptions when you try to use its `Proxy` objects). I think your only option would be to have a child tell the parent what `action` to use, then have the parent create the `Barrier`, and then pass the `Manager().Barrier` from the parent to the children. But that would require passing both a `mp.Queue` and an `mp.Lock` to all the children... – dano Apr 02 '15 at 22:36
  • ...which is pretty complicated. It's probably better to just refactor the code so that you can create the barrier prior to creating the children. – dano Apr 02 '15 at 22:37
  • Actually...there might be a way to do this by connecting the children to a remote `Manager`. It still might end up being more complicated than its worth, though. – dano Apr 02 '15 at 22:42

3 Answers3

1

Here's an example of how you could do this by creating a multiprocessing.managers.BaseManager in one child, and then connecting to that manager from all the other children. Note that it requires passing a multiprocessing.Lock from the parent to all the children for synchronization purposes, which you mentioned you'd prefer to avoid. I'm not sure there's any other option, though.

import multiprocessing as mp
from multiprocessing.managers import BaseManager

class MyManager(BaseManager):
    pass

def f(lock):
  # do something
  with lock:
      try:
          MyManager.register('get_barrier')
          m = MyManager(address=('localhost', 5555), authkey=b'akey')
          m.connect()
          b = m.get_barrier()
          print("Got the barrier from the manager")
      except OSError as e:
          # We are the first. Create the manager, register
          # a mp.Barrier instance with it, and start it up.
          print("Creating the manager...")
          b = mp.Barrier(8)
          MyManager.register('get_barrier', callable=lambda:b)
          m = MyManager(address=('localhost', 5555), authkey=b'akey')
          m.start()
  b.wait()
  print("Done!")
  # do more stuff

def main():
    lock = mp.Lock()
    for i in range(8):
        p = mp.Process(target=f, args=(lock,))
        p.start()

if __name__ == '__main__':
  main()

Output:

Creating the manager...
Got the barrier from the manager
Got the barrier from the manager
Got the barrier from the manager
Got the barrier from the manager
Got the barrier from the manager
Got the barrier from the manager
Got the barrier from the manager
Done!
Done!
Done!
Done!
Done!
Done!
Done!
Done!
dano
  • 91,354
  • 19
  • 222
  • 219
  • Thx! I guess @BiRico solution is simpler for my specific situation. Your answer, OTOH, provides a general solution of using `Manager` to send synchronization objects from one child process to another. I suppose it's even possible to get rid of `lock`. For example, I can pass each newly created process an additional identification argument ('0', ..., '7'). The process that received '0' can do the creation, and the other ones can loop through try/except trying to connect to the `Manager`. – max May 07 '17 at 01:29
1

Would it be possible to simply capture the id's of the processes and manaully call your action in only one of them? Something like this?

import multiprocessing as mp
barrier = mp.Barrier(8)

def f():
  # create action
  def action():
      print("action was run on process {}.".format(id))

  # do something
  print("Hello from process {}.".format(id))
  id = barrier.wait()
  if id == 0:
      action()
  barrier.wait()

  # Do more stuff

def main():
  for i in range(8):
    p = mp.Process(target = f)
    p.start()

if __name__ == '__main__':
  main()
Bi Rico
  • 25,283
  • 3
  • 52
  • 75
  • Sorry for the delay! The `action` I want to perform only becomes known while executing of one of the child processes. Your approach replaces the problem of passing `Barrier` instance from one child to all others with the (easier) problem of passing around the `action` function. For simple functions, it will work with no effort. For more complex cases, I can try to refactor appropriately: https://stevenengelhardt.com/2013/01/16/python-multiprocessing-module-and-closures/; http://stackoverflow.com/questions/1816958/cant-pickle-type-instancemethod-when-using-pythons-multiprocessing-pool-ma – max May 07 '17 at 01:14
0

Here's a version which would also work on Windows (where the missing fork is causing additional troubles):

import multiprocessing as mp

def procs(uid_barrier):
    uid, barrier = uid_barrier
    print(uid, 'waiting')
    barrier.wait()
    print(uid, 'past barrier')    

def main():
    N_PROCS = 10
    with mp.Manager() as man:
        barrier = man.Barrier(N_PROCS)
        with mp.Pool(N_PROCS) as p:
            p.map(procs, ((uid, barrier) for uid in range(N_PROCS)))

if __name__ == '__main__':
    mp.freeze_support()
    main()
Tom Pohl
  • 2,711
  • 3
  • 27
  • 34