0

Task:

  • Execute a function concurrently.
  • The function takes multiple arguments.
  • Only one of the arguments change; the remaining arguments are consistent.
  • The function should be executed in parallel on the varying parameter.

I have read and attempted several SO questions, none have resolved my problem, some are listed below:

Attempts:

import numpy as np
import pandas as pd
from functools import partial
from concurrent.futures import ProcessPoolExecutor, as_completed


# Example data
data = {'name': ['Alice', 'Bob', 'Charlie'],
        'age': [25, 30, 35],
        'height': [5.6, 6.1, 5.8],
        'weight': [120, 150, 180]}

df = pd.DataFrame(data)


# Example function
def example_fun(param1, param2="arg2a", param3="feet", param4="arg4a"):
    start = datetime.now()
    print(f"Start: {start}")
    arr = np.array(param1.iloc[:, 2])
    print(param2)

    if param3 == "metres":
        arr = arr * 0.3048
        param1["height"] = arr
        print(param4)
    elif param3 == "feet":
        print(f"Already in {param3}")

    end = datetime.now()
    print(f"Duration: {end - start}")

    return param1


# Multiprocessing - attempt 1
with ProcessPoolExecutor() as executor:
    results = executor.map(
        partial(
            example_fun,
            param1=df,
            param2="arg2"
        ),
        ["metres", "feet"]
    )
    res1, res2 = list(results)
    

# Multiprocessing - attempt 2
with ProcessPoolExecutor() as executor:
    results = [
        executor.submit(partial(example_fun, param1=df, param2="arg2"), param3=arg)
        for arg in ["metres", "feet"]
    ]
    for res in as_completed(results):
        print(res.result())

Notes: The dummy example function doesn't fully reproduce the error I get with the actual function: Whilst example_fun() attempts above result in a BrokenProcessPool error, the actual function produces a TypeError where the multiple arguments that should vary are passed to the first parameter.

Buzz B
  • 75
  • 7

1 Answers1

1

That is impossible in Python syntax.

Let's take a look at a call where "metres" is given as param3. partial gives param1 and param2 as keyword arguments, and map gives param3 as a positional argument. This can only be written by either of the following:

# First option:
example_fun("metres", param1=df, param2="arg2")

# Second option:
example_fun(param1=df, param2="arg2", "metres")

In the first, "metres" is given as the first positional argument, i.e., param1. This means that multiple param1s are given. In the second, "metres" is now the third argument, but Python syntax does not allow positional arguments to be given after keyword arguments.

However, there are two obvious alternatives. The first is to make all arguments positional.

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        results = executor.map(partial(example_fun, df, "arg2"), ["metres", "feet"])
        res1, res2 = list(results)

In addition, starmap can be used instead of partial.

if __name__ == "__main__":
    with multiprocessing.Pool() as pool:
        results = pool.starmap(example_fun, [
            (df, "arg2", param3) for param3 in ["metres", "feet"]
        ])
        res1, res2 = list(results)

The second alternative is to make all arguments keyword arguments. This can be achieved with this wrapper:

class dict_to_kwargs:
    def __init__(self, func):
        self.func = func

    def __call__(self, arg, **kwargs):
        return self.func(**kwargs, **arg)


if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        results = executor.map(dict_to_kwargs(partial(example_fun, param1=df, param2="arg2")), [
            {"param3": param3} for param3 in ["metres", "feet"]
        ])
        res1, res2 = list(results)

You can remove partial if you wish.

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        results = executor.map(dict_to_kwargs(example_fun), [
            {"param1": df, "param2": "arg2", "param3": param3} for param3 in ["metres", "feet"]
        ])
        res1, res2 = list(results)

Edit: About BrokenProcessPool

BrokenProcessPool is a completely different issue. Both multiprocessing.pool.Pool and concurrent.futures.ProcessPoolExecutor are documented as not supported in interactive interpreter (Note that Jupyter Notebook is also an interactive interpreter).

Note: Functionality within this package requires that the __main__ module be importable by the children. This is covered in Programming guidelines however it is worth pointing out here. This means that some examples, such as the multiprocessing.pool.Pool examples will not work in the interactive interpreter.

The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.

So this is also impossible. However, this does not apply to code that is loaded as a module. If you put the above code into a .py file and import it from notebook/terminal, it should work fine.

Don't forget to make it a function so that the execution part of the pool can be called from notebook/terminal.

def run_example_fun_in_pool():
    with ProcessPoolExecutor() as executor:
        results = executor.map(partial(example_fun, df, "arg2"), ["metres", "feet"])
        return list(results)

Then, import it from the notebook/terminal and run it:

import above_codes
above_codes.run_example_fun_in_pool()

Here is the complete code:

from datetime import datetime

import numpy as np
import pandas as pd
from functools import partial
from concurrent.futures import ProcessPoolExecutor


# Example data
data = {'name': ['Alice', 'Bob', 'Charlie'],
        'age': [25, 30, 35],
        'height': [5.6, 6.1, 5.8],
        'weight': [120, 150, 180]}

df = pd.DataFrame(data)


# Example function
def example_fun(param1, param2="arg2a", param3="feet", param4="arg4a"):
    start = datetime.now()
    print(f"Start: {start}")
    arr = np.array(param1.iloc[:, 2])
    print(param2)

    if param3 == "metres":
        arr = arr * 0.3048
        param1["height"] = arr
        print(param4)
    elif param3 == "feet":
        print(f"Already in {param3}")

    end = datetime.now()
    print(f"Duration: {end - start}")

    return param1


class dict_to_kwargs:  # This is not necessary if you don't use it.
    def __init__(self, func):
        self.func = func

    def __call__(self, arg, **kwargs):
        return self.func(**kwargs, **arg)


def run_example_fun_in_pool():
    with ProcessPoolExecutor() as executor:
        results = executor.map(partial(example_fun, df, "arg2"), ["metres", "feet"])
        return list(results)

ken
  • 1,543
  • 1
  • 2
  • 14
  • Thank you for your comprehensive and well explained answer; however, none of the examples work (this is with the dummy example data and function). I receive the error: `BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.` I am executing this in an `.ipynb` and so am running the cells without `if __name__ == "__main__":` if that might give a clue to the problem. – Buzz B Jul 19 '23 at 13:52
  • 1
    @BuzzB I have added that to my answer as well. – ken Jul 19 '23 at 15:16
  • Thanks once again for your help, clear explanation and examples. This works brilliantly. I was unsure if I needed to place the `def run_example_fun_in_pool():` beneath `if __name__ == "__main__":` in the `.py` module, but as your updated code did not I also didn't and this worked. However, I did notice that when I execute the imported function the print statements in the function do not appear; are all of the previous code meant to exist as inner functions within the `run_example_fun_in_pool()`, as I have them all defined separately in the module? I need the prints for monitoring. – Buzz B Jul 20 '23 at 04:17
  • 1
    Just replace `if __name__ == "__main__":` to `def run_example_fun_in_pool():`. Also I have added the complete code to my answer for reference. – ken Jul 20 '23 at 13:08
  • Thank you for your help. Is it possible to view the print statements in imported functions when using in `ProcessPoolExecutor`? I don't see any with `executor.map`, nor when using `executor.submit` with `res.result`. – Buzz B Jul 21 '23 at 04:18
  • 1
    Hmm. It should be visible. Unless you did something wrong, that could be another problem. Unfortunately, I don't know much about Jupyter, so I don't think I can help. You should post a new question to get help from people who do. – ken Jul 21 '23 at 05:05