That is impossible in Python syntax.
Let's take a look at a call where "metres"
is given as param3
. partial
gives param1
and param2
as keyword arguments, and map
gives param3
as a positional argument. This can only be written by either of the following:
# First option:
example_fun("metres", param1=df, param2="arg2")
# Second option:
example_fun(param1=df, param2="arg2", "metres")
In the first, "metres"
is given as the first positional argument, i.e., param1
. This means that multiple param1
s are given. In the second, "metres"
is now the third argument, but Python syntax does not allow positional arguments to be given after keyword arguments.
However, there are two obvious alternatives. The first is to make all arguments positional.
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
results = executor.map(partial(example_fun, df, "arg2"), ["metres", "feet"])
res1, res2 = list(results)
In addition, starmap can be used instead of partial
.
if __name__ == "__main__":
with multiprocessing.Pool() as pool:
results = pool.starmap(example_fun, [
(df, "arg2", param3) for param3 in ["metres", "feet"]
])
res1, res2 = list(results)
The second alternative is to make all arguments keyword arguments. This can be achieved with this wrapper:
class dict_to_kwargs:
def __init__(self, func):
self.func = func
def __call__(self, arg, **kwargs):
return self.func(**kwargs, **arg)
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
results = executor.map(dict_to_kwargs(partial(example_fun, param1=df, param2="arg2")), [
{"param3": param3} for param3 in ["metres", "feet"]
])
res1, res2 = list(results)
You can remove partial
if you wish.
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
results = executor.map(dict_to_kwargs(example_fun), [
{"param1": df, "param2": "arg2", "param3": param3} for param3 in ["metres", "feet"]
])
res1, res2 = list(results)
Edit: About BrokenProcessPool
BrokenProcessPool
is a completely different issue. Both multiprocessing.pool.Pool and concurrent.futures.ProcessPoolExecutor are documented as not supported in interactive interpreter (Note that Jupyter Notebook is also an interactive interpreter).
Note: Functionality within this package requires that the __main__
module be importable by the children. This is covered in Programming guidelines however it is worth pointing out here. This means that some examples, such as the multiprocessing.pool.Pool examples will not work in the interactive interpreter.
The __main__
module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.
So this is also impossible. However, this does not apply to code that is loaded as a module. If you put the above code into a .py file and import it from notebook/terminal, it should work fine.
Don't forget to make it a function so that the execution part of the pool can be called from notebook/terminal.
def run_example_fun_in_pool():
with ProcessPoolExecutor() as executor:
results = executor.map(partial(example_fun, df, "arg2"), ["metres", "feet"])
return list(results)
Then, import it from the notebook/terminal and run it:
import above_codes
above_codes.run_example_fun_in_pool()
Here is the complete code:
from datetime import datetime
import numpy as np
import pandas as pd
from functools import partial
from concurrent.futures import ProcessPoolExecutor
# Example data
data = {'name': ['Alice', 'Bob', 'Charlie'],
'age': [25, 30, 35],
'height': [5.6, 6.1, 5.8],
'weight': [120, 150, 180]}
df = pd.DataFrame(data)
# Example function
def example_fun(param1, param2="arg2a", param3="feet", param4="arg4a"):
start = datetime.now()
print(f"Start: {start}")
arr = np.array(param1.iloc[:, 2])
print(param2)
if param3 == "metres":
arr = arr * 0.3048
param1["height"] = arr
print(param4)
elif param3 == "feet":
print(f"Already in {param3}")
end = datetime.now()
print(f"Duration: {end - start}")
return param1
class dict_to_kwargs: # This is not necessary if you don't use it.
def __init__(self, func):
self.func = func
def __call__(self, arg, **kwargs):
return self.func(**kwargs, **arg)
def run_example_fun_in_pool():
with ProcessPoolExecutor() as executor:
results = executor.map(partial(example_fun, df, "arg2"), ["metres", "feet"])
return list(results)