6

The end goal is to execute a method in background, but not in parallel : when multiple objects are calling this method, each should wait for their turn to proceed. To achieve running in background, I have to run the method in a subprocess (not a thread), and I need to start it using spawn (not fork). To prevent parallel executions, the obvious solution is to have a global lock shared between processes.
When processes are forked, which is the default on Unix, it is easy to achieve, as highlighted in both of the following codes.
We can share it as a class variable :

import multiprocessing as mp
from time import sleep

class OneAtATime:

    l = mp.Lock()

    def f(self):
        with self.l:
            sleep(1)
        print("Hello")

if __name__ == "__main__":
    a = OneAtATime()
    b = OneAtATime()
    p1 = mp.Process(target = a.f)
    p2 = mp.Process(target = b.f)
    p1.start()
    p2.start()

Or we can pass it to the method :

import multiprocessing as mp
from time import sleep

class OneAtATime:
    def f(self, l):
        with l:
            sleep(1)
        print("Hello")

if __name__ == "__main__":
    a = OneAtATime()
    b = OneAtATime()
    m = mp.Manager()
    l = mp.Lock()
    p1 = mp.Process(target = a.f, args = (l,))
    p2 = mp.Process(target = b.f, args = (l,))
    p1.start()
    p2.start()

Both of these codes have the appropriate behaviour of printing "hello" at one second of interval. However, when changing the start method to 'spawn', they become broken.
The first one (1) prints both "hello"s at the same time. This is because the internal state of a class is not pickled, so they do not have the same lock.
The second one (2) fails with FileNotFoundError at runtime. I think it has to do with the fact that locks cannot be pickled : see Python sharing a lock between processes.
In this answer, two fixes are suggested (side note : I cannot use a pool because I want to randomly create an arbitrary number of processes).
I haven't found a way to adapt the second fix, but I tried to implement the first one :

import multiprocessing as mp
from time import sleep

if __name__ == "__main__":
    mp.set_start_method('spawn')

class OneAtATime:
    def f(self, l):
        with l:
            sleep(1)
        print("Hello")

if __name__ == "__main__":
    a = OneAtATime()
    b = OneAtATime()
    m = mp.Manager()
    l = m.Lock()
    p1 = mp.Process(target = a.f, args = (l,))
    p2 = mp.Process(target = b.f, args = (l,))
    p1.start()
    p2.start()

This fails with AttributeError and FileNotFoundError (3). In fact it also fails (BrokenPipe) when the fork method is used (4).
What is the proper way of sharing a lock between spawned processes ?
A quick explanation of the four fails I numbered would be nice, too. I'm running Python 3.6 under Archlinux.

Zil0
  • 103
  • 1
  • 8
  • It sounds like you've read the documentation pretty carefully. Have you looked at the solutions in [17.2.1.5](https://docs.python.org/3.6/library/multiprocessing.html#sharing-state-between-processes)? You should be able to put the lock into shared memory or a manager, right? – Scott Mermelstein Jun 22 '17 at 14:34
  • I tried, it's in the third code snippet. It didn't work but there are probably a lot of ways to do this, including one that does the trick. – Zil0 Jun 22 '17 at 14:42
  • Sorry, I should have studied the code further before commenting. :-) – Scott Mermelstein Jun 22 '17 at 14:43

2 Answers2

3

Congratulations, you got yourself 90% of the way there. The last step is actually not very hard to do.

Yes, your final code block fails with an AttributeError, but what specifically is the error? "Can't get attribute 'OneAtATime' on ". This is very similar to a problem you've already encountered - it's not pickling the class OneAtATime.

I made the following change and it worked as you'd like:

file ooat.py:

from time import sleep

class OneAtATime:
    def f(self, l):
        with l:
            sleep(1)
        print("Hello")

interactive shell:

import multiprocessing as mp
from oaat import OneAtATime
if __name__ == "__main__":
    mp.set_start_method('spawn')
    a = OneAtATime()
    b = OneAtATime()
    m = mp.Manager()
    l = m.Lock()
    p1 = mp.Process(target = a.f, args = (l,))
    p2 = mp.Process(target = b.f, args = (l,))
    p1.start()
    p2.start()

You may notice, I didn't really do anything - just split your code into two separate files. Try it out, you'll see it works fine. (At least, it did for me, using python 3.5 on ubuntu.)

Scott Mermelstein
  • 15,174
  • 4
  • 48
  • 76
  • I temporarily deleted this answer when I saw that I forgot to test it with the spawn method. Then when I did test, it said "OneAtATime not defined", but only because I forgot the `from ooat import OneAtATime` line. So now I've apparently tested both fork and spawn, and it works fine. – Scott Mermelstein Jun 22 '17 at 15:31
  • Answer is in your post, but not your post. It is "interactive shell". In fact the error I was getting was : AttributeError: 'ForkAwareLocal' object has no attribute 'connection'. There I stopped trying and wrote the question, but I was a googling away from the actual solution : https://stackoverflow.com/a/25456494/8194503. It works perfectly fine in the same file for me, not sure what your error was. I'll edit my question with the fix. Not sure either what the etiquette is on wether should I accept your answer, my guess it to accept it anyway. Thanks for your time ! – Zil0 Jun 22 '17 at 15:58
  • 2
    If you've solved your problem and I didn't, you should actually post your answer, wait whatever period it tells you to, and accept your own answer. That's perfectly appropriate to do, and over time, you'll get upvotes on your answer. – Scott Mermelstein Jun 22 '17 at 16:01
3

The last code snippet works, provided the script does not exit prematurely. Joining processes is enough :

import multiprocessing as mp
from time import sleep

class OneAtATime:
    def f(self, l):
        with l:
            sleep(1)
        print("Hello")

if __name__ == "__main__":
    mp.set_start_method('spawn')
    a = OneAtATime()
    b = OneAtATime()
    m = mp.Manager()
    l = m.Lock()
    p1 = mp.Process(target = a.f, args = (l,))
    p2 = mp.Process(target = b.f, args = (l,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

More info on the error it was causing here https://stackoverflow.com/a/25456494/8194503.

Zil0
  • 103
  • 1
  • 8