3

I want to submit a dynamically loaded functions to the concurrent.futures.ProcessPoolExecutor. Here is the example. There is module.py which contains the function.

# Content of module.py

def func():
    return 1

And then, there is the rest in file.py

# Content of file.py

from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import importlib
from pathlib import Path
import inspect


def load_function_from_module(path):
    spec = importlib.util.spec_from_file_location(path.stem, str(path))
    mod = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(mod)

    return mod


def func_top_level():
    return 2


if __name__ == '__main__':
    # Dynamically load function from other module.
    path = Path(__file__).parent / "module.py"
    func = dict(inspect.getmembers(load_function_from_module(path)))["func"]

    with ProcessPoolExecutor(2) as executor:
        future = executor.submit(func)
        future_ = executor.submit(func_top_level)

    # Here comes the exception.
    print(future.result())

The traceback is

Traceback (most recent call last):
_pickle.PicklingError: Can't pickle <function func at 0x7f5a548eb050>: it's not the same object as module.func

Solution 1: Wrapping func with top-level function

Place def myfunc(): return func() after the function is loaded and submit myfunc.

This works for this example, but as soon as you move the whole if __name__ ... block into its own main() function, myfunc() becomes local again and the hack does not work. Since the problem occurs deep down in my application, this is not possible.

Attempt 2: Replacing pickle with cloudpickle

My personal favourite for a solution would be to change the way ProcessPoolExecutor serializes objects. For example, cloudpickle can serialize func.

Although, this answer suggests that it is possible to register a custom reducer, the following PRs and issues suggest the feature does not work or I am just unable to replace pickle with cloudpickle.

Thank you very much for your help.

tobiasraabe
  • 427
  • 1
  • 6
  • 12

2 Answers2

1

I use a simple wrapper class to patch cloudpickle into ProcessPoolExecutor, like so:

from concurrent.futures import ProcessPoolExecutor
import cloudpickle

def apply_cloudpickle(fn, /, *args, **kwargs):
    fn = cloudpickle.loads(fn)
    return fn(*args, **kwargs)

class CloudpickleProcessPoolExecutor(ProcessPoolExecutor):
    def submit(self, fn, /, *args, **kwargs):
        return super().submit(apply_cloudpickle, cloudpickle.dumps(fn), *args, **kwargs)

Then when you call .map and other methods, cloudpickle will be pickling the function object under the hood. (Note that arguments will still run through the ordinary pickle pipeline.)

(The same strategy works with dill etc.)

mostsquares
  • 834
  • 8
  • 27
  • 1
    Thanks for your answer! I can only add that positional-only arguments (before `/` in the signature) are a Python 3.8+ feature. For Python 3.7, remove the forward slash. If you are using type checking on a newer version, add `type: ignore[override]` to ignore the mismatch of the signatures. – tobiasraabe Apr 22 '23 at 00:39
0

I found a solution with cloudpickle. Start with the contents of the two files. Then, move the body under if __name__ ... to a new function main(). This makes the problem harder because the first solution will not work.

Since pickle cannot serialize the dynamically imported function func() from module.py, we serialize the function with cloudpickle to bytes in the main process. Then, we will submit a function which accepts the bytes and deserializes the function. Then, the function can be executed. These two functions are changed or added:

import cloudpickle


def main():
    # Dynamically load function from other module.
    path = Path(__file__).parent / "module.py"
    func = dict(inspect.getmembers(load_function_from_module(path)))["func"]

    with ProcessPoolExecutor(2) as executor:
        bytes_ = cloudpickle.dumps(func)
        future = executor.submit(deserialize_and_execute, bytes_)
        future_ = executor.submit(func_top_level)

    # Here comes the exception.
    print(future.result())


def deserialize_and_execute(bytes_):
    func = cloudpickle.loads(bytes_)
    func()
tobiasraabe
  • 427
  • 1
  • 6
  • 12