2

I would like to parallelise a calculation using the mutliprocessing.pool method. The problem is that the function I would like to use in the calculation presents two args and optional kwargs, being the first argument a dataframe, the second one a str and any kwargs a dictionary.

Both the dataframe and the dictionary I want to use are the same for all the calculations I am trying to carry out, being only the second arg the one that keeps changing. I was therefore hoping to be able to pass it as a list of different strings using the map method to the already packed function with the df and dict.

from utils import *
import multiprocessing
from functools import partial



def sumifs(df, result_col, **kwargs):

    compare_cols = list(kwargs.keys())
    operators = {}
    for col in compare_cols:
        if type(kwargs[col]) == tuple:
            operators[col] = kwargs[col][0]
            kwargs[col] = list(kwargs[col][1])
        else:
            operators[col] = operator.eq
            kwargs[col] = list(kwargs[col])
    result = []
    cache = {}
    # Go through each value
    for i in range(len(kwargs[compare_cols[0]])):
        compare_values = [kwargs[col][i] for col in compare_cols]
        cache_key = ','.join([str(s) for s in compare_values])
        if (cache_key in cache):
            entry = cache[cache_key]
        else:
            df_copy = df.copy()
            for compare_col, compare_value in zip(compare_cols, compare_values):
                df_copy = df_copy.loc[operators[compare_col](df_copy[compare_col], compare_value)]
            entry = df_copy[result_col].sum()
            cache[cache_key] = entry
        result.append(entry)
    return pd.Series(result)

if __name__ == '__main__':

    ca = read_in_table('Tab1')
    total_consumer_ids = len(ca)

    base = pd.DataFrame()
    base['ID'] = range(1, total_consumer_ids + 1)


    result_col= ['A', 'B', 'C']
    keywords = {'Z': base['Consumer archetype ID']}

    max_number_processes = multiprocessing.cpu_count()
    with multiprocessing.Pool(processes=max_number_processes) as pool:
        results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)
    print(results)

However, when I run the code above I get the following error: TypeError: sumifs() missing 1 required positional argument: 'result_col'. How could I provide the function with the first arg and kwargs, while providing the second argument as a list of str so I can paralelise the calculation? I have read several similar questions in the forum but none of the solutions seem to work for this case...

Thank you and apologies if something is not clear, I just learnt of the multiprocessing package today!

Adam B.
  • 192
  • 10
  • Try sending the `keywords` argument with ** as a prefix with out the `kwargs` key. In addition have a look at the following link for more information about calling `Pool.map` function. https://stackoverflow.com/questions/59611745/python-multi-processing-on-for-loop/59623556#59623556 – Amiram Jan 15 '20 at 17:40
  • @Amiram I already tried that, but it yields the same error: ` sumifs() missing 1 required positional argument: 'result_col' ` – Adam B. Jan 16 '20 at 09:28

2 Answers2

3

Let's have a look at two part of your code.

First the sumifs function declaration:

def sumifs(df, result_col, **kwargs):

Secondly, the call to this function with the relevant parameters.

# Those are the params
ca = read_in_table('Tab1')
keywords = {'Z': base['Consumer archetype ID']}

# This is the function call
results = pool.map(partial(sumifs, a=ca, kwargs=keywords), tasks)

Update 1:

After the original code has been edited.It look like the problem is the positional argument assignment, try to discard it.

replace the line:

results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)

with:

results = pool.map(partial(sumifs, ca, **keywords), result_col)

An example code:

import multiprocessing
from functools import partial

def test_func(arg1, arg2, **kwargs):
    print(arg1)
    print(arg2)
    print(kwargs)
    return arg2

if __name__ == '__main__':
    list_of_args2 = [1, 2, 3]
    just_a_dict = {'key1': 'Some value'}
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.map(partial(test_func, 'This is arg1', **just_a_dict), list_of_args2)
    print(results)

Will output:

This is arg1
1
{'key1': 'Some value'}
This is arg1
2
{'key1': 'Some value'}
This is arg1
2
{'key1': 'Some value'}
['1', '2', '3']

More example for how to Multiprocessing.pool with a function that has multiple args and kwargs


Update 2:

Extended example (due to comments):

I wonder however, in the same fashion, if my function had three args and kwargs, and I wanted to keep arg1, arg3 and kwargs costant, how could I pass arg2 as a list for multiprocessing? In essence, how will I inidicate multiprocessing that map(partial(test_func, 'This is arg1', 'This would be arg3', **just_a_dict), arg2) the second value in partial corresponds to arg3 and not arg2?

The Update 1 code would have change as follow:

# The function signature
def test_func(arg1, arg2, arg3, **kwargs):

# The map call
pool.map(partial(test_func, 'This is arg1', arg3='This is arg3', **just_a_dict), list_of_args2)

This can be done using the python positional and keyword assignment. Note that the kwargs is left aside and not assigned using a keyword despite the fact that it's located after a keyword assigned value.

More information about argument assignment differences can be found here.

Amiram
  • 1,227
  • 6
  • 14
  • Hi @Amiram, I've just realised that I forgot to mention that `tasks` is my `result_col` parameter! Hence the trouble... Many thanks though! p.s.: I just edited the question so that's clearer! – Adam B. Jan 20 '20 at 09:44
  • Are you still having a problem? – Amiram Jan 20 '20 at 09:58
  • Yes, still haven't figured out how to pass the first argument and kwargs (which are constant) to the function and the second argument as the iterable for multiprocessing. – Adam B. Jan 20 '20 at 10:20
  • I wonder however, in the same fashion, if my function had three args and kwargs, and I wanted to keep arg1, arg3 and kwargs costant, how could I pass arg2 as a list for multiprocessing? In essence, how will I inidicate multiprocessing that `map(partial(test_func, 'This is arg1', 'This would be arg3', **just_a_dict), arg2)` the second value in partial corresponds to arg3 and not arg2? – Adam B. Jan 21 '20 at 09:46
  • I have added another update to the original answer. – Amiram Jan 21 '20 at 10:54
3

If there is a piece of data that is constant/fixed across all works/jobs, then it is better to "initialize" the processes in the pool with this fixed data during the creation of the pool and map over the varying data. This avoids resending of fixed data with every job request. In your case, I'd do something like the following:

df = None
kw = {}

def initialize(df_in, kw_in):
    global df, kw
    df, kw = df_in, kw_in

def worker(data):
    # computation involving df, kw, and data
    ...

...
    with multiprocessing.Pool(max_number_processes, intializer, (base, keywords)) as pool:
        pool.map(worker, varying_data)

This gist contains a full blown example of using the initializer. This blog post explains the performance gains from using initializer.