1

Multiprocessing with locally defined functions?

I am porting over a library for a client who is very picky about external dependencies.

The majority of the multiprocessing in this library is supported by the pathos ProcessPool module. The main reason being that it can very easily deal with locally defined functions.

I'm trying to get some of this functionality back without forcing this dependence (or having to rewrite large chunks of the library). I understand that the following code works because the function is defined at the top level:

import multiprocessing as mp


def f(x):
    return x * x


def main():
    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))


if __name__ == "__main__":
    main()

The following code (which is what I need to get working) fails as the function is only defined in the local scope:

import multiprocessing as mp


def main():
    def f(x):
        return x * x

    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))


if __name__ == "__main__":
    main()

Anyone know of a good workaround for this specific use case which doesn't require external dependancies? Thanks for reading.

Updates:

  • There is a work around that uses fork but this is unsafe for Mac and Windows (thanks @Monica and @user2357112).
  • @Blop provided an excellent suggestion that will work for many. In my case (not the toy example above) the objects in my generator are unmarshallable.
  • @amsh provided a workaround which seems to work for any function + generator. While a great option, the downside is it that it requires the function be defined at the global scope.
KCQs
  • 86
  • 1
  • 11
  • 1
    This isn't a 3.8-specific problem. Non-`spawn` startmethods were unsafe on Mac before that. 3.8 is just the version that changed the default. – user2357112 Apr 11 '21 at 21:11
  • 1
    (Fork-without-exec isn't a great idea in general these days - the thread-safety issues that make `fork` an unsafe startmethod on Mac aren't Mac-specific. They're just more prevalent on Mac.) – user2357112 Apr 11 '21 at 21:12
  • I have added an answer while assuming that keeping functions in local scope are for code management purposes only and if these locally defined functions have global scope, it is acceptable. You may confirm if this assumption holds true or not. Thanks – amsh Apr 13 '21 at 19:45

2 Answers2

3

the main problem is the closure variables.

if you don't have those it can be done like this:

import marshal
import multiprocessing
import types
from functools import partial


def main():
    def internal_func(c):
        return c*c

    with multiprocessing.Pool(5) as pool:
        print(internal_func_map(pool, internal_func, [i for i in range(10)]))


def internal_func_map(pool, f, gen):
    marshaled = marshal.dumps(f.__code__)
    return pool.map(partial(run_func, marshaled=marshaled), gen)


def run_func(*args, **kwargs):
    marshaled = kwargs.pop("marshaled")
    func = marshal.loads(marshaled)

    restored_f = types.FunctionType(func, globals())
    return restored_f(*args, **kwargs)


if __name__ == "__main__":
    main()

the idea is that the function code has everything you need in order to run it in a new process. notice that no external dependencies are needed, just regular python libraries.

If closures are indeed needed, then the most difficult part about this solution is actually creating them. (in closure there is something called a "cell" which is not very easy to create by code...)

Here is the somewhat elaborate working code:

import marshal
import multiprocessing
import pickle
import types
from functools import partial


class A:
    def __init__(self, a):
        self.a = a


def main():
    x = A(1)

    def internal_func(c):
        return x.a + c

    with multiprocessing.Pool(5) as pool:
        print(internal_func_map(pool, internal_func, [i for i in range(10)]))


def internal_func_map(pool, f, gen):
    closure = f.__closure__
    marshaled_func = marshal.dumps(f.__code__)
    pickled_closure = pickle.dumps(tuple(x.cell_contents for x in closure))
    return pool.map(partial(run_func, marshaled_func=marshaled_func, pickled_closure=pickled_closure), gen)


def run_func(*args, **kwargs):
    marshaled_func = kwargs.pop("marshaled_func")
    func = marshal.loads(marshaled_func)
    pickled_closure = kwargs.pop("pickled_closure")
    closure = pickle.loads(pickled_closure)

    restored_f = types.FunctionType(func, globals(), closure=create_closure(func, closure))
    return restored_f(*args, **kwargs)


def create_closure(func, original_closure):
    indent = " " * 4
    closure_vars_def = f"\n{indent}".join(f"{name}=None" for name in func.co_freevars)
    closure_vars_ref = ",".join(func.co_freevars)
    dynamic_closure = "create_dynamic_closure"
    s = (f"""
def {dynamic_closure}():
    {closure_vars_def}
    def internal():
        {closure_vars_ref}
    return internal.__closure__
""")
    exec(s)
    created_closure = locals()[dynamic_closure]()
    for closure_var, value in zip(created_closure, original_closure):
        closure_var.cell_contents = value
    return created_closure


if __name__ == "__main__":
    main()

Hope that helps or at least gives you some ideas on how to tackle this problem!

Blop
  • 473
  • 2
  • 6
  • Thank you for your answer, this is a solution I would never have come to on my own! After trying this out on my own problem I'm getting an error that the custom objects in our package are not compatible with marshal (the error I receive is "ValueError: unmarshallable object"). After some more digging it seems the only solution is the external package dill? Perhaps you know a way around this error. – KCQs Apr 14 '21 at 17:41
  • 1
    hey @KCQs, i've edited the code to support a wider variety of objects... can you check it now? – Blop Apr 18 '21 at 06:14
  • Thanks @Blop, this worked about half of the time, amazing! It looks like the cases that failed were due to the internal function referencing a top level function (I would have thought top level function would be accessible to the marshaled function but apparently not). It looks like I will need to create a closure for functions referenced in the internal function. – KCQs Apr 24 '21 at 17:53
1

Original Answer

Disclaimer: This answer applies if you want to define functions locally for better code management, but are okay with their global scope

You can use global keyword before defining the function. It will solve the issue of pickling the function (because it is a global function now), meanwhile defining it in local scope.

import multiprocessing as mp

def main():
    global f
    def f(x):
        return x * x

    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))

if __name__ == "__main__":
    main()
    print(f(4)) #Inner function is available here as well.

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
16

Adding another example of having multiple functions with same name, each subsequent function overrides the previous one.

import multiprocessing as mp

def main():
    global f
    def f(x):
        return x * x

    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))

def main2():
    global f
    def f(x):
        return x * x * x

    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))

if __name__ == "__main__":
    main()
    main2()
    print(f(4))

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
64

Updated Answer

Revoke global status, after map is called. Thanks to @KCQs for hint in the comments.

To make sure global functions don't cause any issues for rest of the code, you may simply add del statement for the global function to revoke their global status.

import multiprocessing as mp

def main():
    global f
    def f(x):
        return x * x

    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))
    del f

if __name__ == "__main__":
    main()
    print(f(4)) #Inner function is not available.

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Traceback (most recent call last):
  File "<file>.py", line 25, in <module>
    print(f(4))
NameError: name 'f' is not defined

Although python automatically collects garbage, you may also invoke garbage collector manually.

amsh
  • 3,097
  • 2
  • 12
  • 26
  • 1
    Thanks for your answer amsh! I would like to avoid putting functions into the global scope as this is a rather large package and I don't want to introduce code that could break things that I'm not aware of. But perhaps I'm overly pessimistic about global functions. Is there a best practice for dealing with global functions? (ie. should I make the function name a long string of random characters to avoid the chance of overlap? can I free the memory after the map call? etc) – KCQs Apr 14 '21 at 17:45
  • @KCQs, thanks for the pointer in your comment. In fact we can make sure for our global function to not stay global after map is called. I have updated my answer to add the relevant example. Thanks – amsh Apr 14 '21 at 18:19
  • When I try this it only works if there is already an `f` defined at global scope and it uses that definition, i.e. your exact code (plus the missing `import`) fails with **AttributeError: Can't get attribute 'f' on – Booboo Apr 15 '21 at 12:20