4

Taking the second answer on this post, I have tried the following code

from multiprocessing import Pool
import numpy as np
from itertools import repeat
import pandas as pd

def doubler(number, r):
    result = number * 2 + r
    return result

def f1():
    return np.random.randint(20)

if __name__ == '__main__':
    df = pd.DataFrame({"A": [10,20,30,40,50,60], "B": [-1,-2,-3,-4,-5,-6]})
    num_chunks = 3
    # break df into 3 chunks
    chunks_dict = {i:np.array_split(df, num_chunks)[i] for i in range(num_chunks)}

    arg1 = f1()

    with Pool() as pool:
        results = pool.starmap(doubler, [zip(chunks_dict[i]['B'], repeat(arg1)) for i in range(num_chunks)])

    print(results)

>>> [(-1, 20, -1, 20, -2, 20), (-3, 20, -3, 20, -4, 20), (-5, 20, -5, 20, -6, 20)]

This is not the results I want. What I want is to feed each element of column B of df into the doubler function, as well as the output from f1 - this is why I am using starmap and repeat - to get a list output of the input doubled and some random integer added to it.

For example, if the output of f1 was 2, then I want to return

>>> [0,-2,-4,-6,-8,-10] # [2*(-1) + 2, 2*(-2) + 2, ... ]

Can anyone advise how I would achieve this desired result? Thanks

EDIT: Inserting the whole data frame does not work either:

with Pool() as pool:
    results = pool.starmap(doubler, [zip(df['B'], repeat(arg1))])

>>> TypeError: doubler() takes 2 positional arguments but 6 were given

Essentially, I just want to break up my dataframe into chunks, and give these chunks, as well as other variables (arg1) into a function that accepts more than one argument.

PyRsquared
  • 6,970
  • 11
  • 50
  • 86
  • I think your `f1()` should be returning that random number, right? I don't think that solves your problem but that looks odd. – Paul Nov 10 '17 at 17:51

1 Answers1

4

Your arguments don't look right. For instance, after adding a print of the arguments in doubler I see the following (assume f1() returns 2):

doubler number (-3, 2) r (-4, 2)
doubler number (-1, 2) r (-2, 2)
doubler number (-5, 2) r (-6, 2)

This is because the arguments passed into the starmap are zipped together as opposed to just a list of tuples.

I think it is much easier to rewrite your chunking procedure and argument generation. Assuming I'm understanding this correctly, you want to result in the following tuple list for the arguments (assume f1() returns 2):

[(-1, 2), (-2, 2), (-3, 2), (-4, 2), (-5, 2), (-6, 2)]

This will then get applied to the doubler function such that the starmap returns this [doubler(-1, 2), doubler(-2, 2),...doubler(-6, 2)] which is [[0, -2, -4, -6, -8, -10]. Try this:

from multiprocessing import Pool
import numpy as np
from itertools import repeat
import pandas as pd


def doubler(number, r):
    result = number * 2 + r
    return result


def f1():
    return np.random.randint(20)


if __name__ == '__main__':
    df = pd.DataFrame({"A": [10, 20, 30, 40, 50, 60], "B": [-1, -2, -3, -4, -5, -6]})
    num_processes = 3

    # the "r" value to use with every "B" value
    random_r = f1()

    # zip together a list of tuples of each B value and the random r value
    tuples = [(b, r) for b, r in zip(df.B.values, repeat(random_r, len(df.B.values)))]
    print(tuples)

    with Pool(num_processes) as pool:
        results = pool.starmap(doubler, tuples)

    print(results)
Paul
  • 5,473
  • 1
  • 30
  • 37
  • Thanks. That seems to work for this simple case. Is there any way to check if each processing core is only working on a single chunk of the data? What I mean by that is, if I run some code operating on 300k rows of a dataframe in serial, it takes 30 minutes - therefore I would expect that splitting the data into 4 chunks and having 1 core work on 1 chunk (75k rows) I would expect the time to be quartered i.e. to about 7-8 minutes. However this isnt happening, it takes the same amount of time as the serial code. – PyRsquared Nov 14 '17 at 13:34