-1

As shown in the title, I have a big data frame (df) that needs to be processed row-wise, as df is big (6 GB), I want to utilize the multiprocessing package of python to speed it up, below is a toy example, given my writing skill and complexity of the task, I'll describe what I want to achieve briefly and levea the details for the code.

The original data is df, from which I want to perform some row-wise analysis(order does not matter) that requires not just the focal row itself but other rows that satisfy certain conditions. Below are the toy data and my code,

import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pool
import time
import math

# a test example
start_time = time.time()
df = pd.DataFrame({'value': np.random.randint(0, 10, size=30),
                   'district': (['upper'] * 5 + ['down'] * 5) * 3,
                   'region': ['A'] * 10 + ['B'] * 10 + ['C'] * 10})

df['row_id'] = df.index

print(df)

    value district region  row_id
0       8    upper      A       0
1       4    upper      A       1
2       0    upper      A       2
3       3    upper      A       3
4       0    upper      A       4
5       0     down      A       5
6       3     down      A       6
7       7     down      A       7
8       1     down      A       8
9       7     down      A       9
10      7    upper      B      10
11      3    upper      B      11
12      9    upper      B      12
13      8    upper      B      13
14      2    upper      B      14
15      4     down      B      15
16      5     down      B      16
17      3     down      B      17
18      5     down      B      18
19      3     down      B      19
20      3    upper      C      20
21      1    upper      C      21
22      3    upper      C      22
23      0    upper      C      23
24      3    upper      C      24
25      2     down      C      25
26      0     down      C      26
27      1     down      C      27
28      1     down      C      28
29      0     down      C      29

What I want to do is to add two other columns count_b and count_a, which simply count the number of rows that fall in the range (value - 2, value) and (value, value + 2) within the same region and district subset, for instance, count_b for row row_id==0 should be 0 since no rows within the region=='A' and district == 'upper' has value 7, which fall in (8-2, 8). So the desired output should be:

   count_a count_b region row_id
0        0       0      A      0
1        0       1      A      1
2        0       0      A      2
3        1       0      A      3
4        0       0      A      4
5        1       0      A      5
6        0       0      A      6
7        0       0      A      7
8        0       1      A      8
9        0       0      A      9
10       1       0      B     10
11       0       1      B     11
12       0       1      B     12
13       1       1      B     13
14       1       0      B     14
15       2       2      B     15
16       0       1      B     16
17       1       0      B     17
18       0       1      B     18
19       1       0      B     19
20       0       0      C     20
21       0       1      C     21
22       0       0      C     22
23       1       0      C     23
24       0       0      C     24
25       0       2      C     25
26       2       0      C     26
27       1       2      C     27
28       1       2      C     28
29       2       0      C     29

question 1: can such task be vectorized?

question 2: how can we use multiprocessing to speed it up (solved)?

I decided to go with multiprocessing for the reason that I'm not sure how to accomplish this through vectorization. The solution is (based on the answer provided by )

multiprocessing

def b_a(input_df,r_d):
    print('length of input dataframe: ' + str(len(input_df)))
    # print('region: ' + str(r_d[0]), 'district: ' + str(r_d[1]))
    sub_df = input_df.loc[(input_df['region'].isin([r_d[0]])) & (input_df['district'].isin([r_d[1]]))]

    print('length of sliced dataframe: ' + str(len(sub_df)))

    print(r_d[0],r_d[1])


    b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])

    for id in sub_df['row_id']:
        print('processing row: ' + str(id))
        focal_value = sub_df.loc[sub_df['row_id'].isin([id])]['value']
        temp_b = sub_df.loc[
            (sub_df['value'] > (focal_value - 2).values[0]) & (sub_df['value'] < (focal_value.values[0]))]
        temp_a = sub_df.loc[
            (sub_df['value'] > (focal_value.values[0])) & (sub_df['value'] < (focal_value + 2).values[0])]

        if len(temp_a):
            temp_a['count_a'] = temp_a['row_id'].count()
        else:
            temp_a = temp_a.append(pd.Series(), ignore_index=True)
            temp_a = temp_a.reindex(
                columns=[*temp_a.columns.tolist(), 'count_a'], fill_value=0)
            print(temp_a)

        if len(temp_b):
            temp_b['count_b'] = temp_b['row_id'].count()
        else:
            temp_b = temp_b.append(pd.Series(), ignore_index=True)
            temp_b = temp_b.reindex(
                columns=[*temp_b.columns.tolist(), 'count_b'], fill_value=0)
        print(len(temp_a),len(temp_b))

        temp_b.drop_duplicates('count_b', inplace=True)
        temp_a.drop_duplicates('count_a', inplace=True)
        temp = pd.concat([temp_b[['count_b']].reset_index(drop=True),
                          temp_a[['count_a']].reset_index(drop=True)], axis=1)

        temp['row_id'] = id
        temp['region'] = str(r_d[0])

        b_a = pd.concat([b_a, temp])

    return b_a

r_d_list = list(itertools.product(df['region'].unique(),df['district'].unique()))


if __name__ == '__main__':
    P = Pool(3)
    out = P.starmap(b_a, zip([chunks[r_d_list.index(j)] for j in r_d_list for i in range(len(j))],
                             list(itertools.chain.from_iterable(r_d_list)))) # S3

    # out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list)) # S2
    # out = P.starmap(b_a,zip(df,r_d_list)) # S1

    # print(out)
    P.close()
    P.join()
    final = pd.concat(out, ignore_index=True)
    print(final)

    final.to_csv('final.csv',index=False)
print("--- %s seconds ---" % (time.time() - start_time))

Since using P.starmap (as well as P.map) requires one to feed the function with all possible pairs of argument for b_a, solution S1 won't work since the zip(df,r_d_list) actually produces zip between columns names of df and elements in r_d_list, which will then cause the error AttributeError: 'str' object has no attribute 'loc' because the input_df for function b_a is literally a string (column name df), which can be verified by looking into the output of print('length of input dataframe: ' + str(len(input_df))), which will produce the length of column names of input_df(in this case df). The accepted answer corrects this by creating a reference array (S2) (not sure what that exactly is) which has the same length as the parameter list (r_d_list). This solution works great but maybe slow when df is large since, to my personal understanding, it requires a search through the entire dataframe for each pair of parameters (region and distrcit), so I came up with a modified version which split the data into chunks based on region and distrcit and then searches within each chunk, instead of the entire data frame (S3). For me, this solution improves performance by 20 percent in terms of running time, see below for the code:

region = df['region'].unique()

chunk_numbers = 3

chunk_region = math.ceil(len(region) / chunk_numbers)

chunks = list()

r_d_list = list()

row_count = 0

for i in range(chunk_numbers):

    print(i)

    if i < chunk_numbers-1:
        regions = region[(i*chunk_region):((i+1)*chunk_region)]
        temp = df.loc[df['region'].isin(regions.tolist())]
        chunks.append(temp)
        r_d_list.append(list(itertools.product(regions,temp['district'].unique())))

        del temp

    else:
        regions = region[(i * chunk_region):len(region)]
        temp = df.loc[df['region'].isin(regions.tolist())]
        chunks.append(temp)
        r_d_list.append(list(itertools.product(regions,temp['district'].unique())))

        del temp

    row_count = row_count + len(chunks[i])
    print(row_count)

add this between print(df) and def b_a(), and remember to comment out the r_d_list = ... before if __name__ == '__main__'.

Thanks for this wonderful community, I have a workable solution now, I updated my question to provide some material for those who may run into the same problem in the future as well as to better formulate the question to get even better solutions.

Jia Gao
  • 1,172
  • 3
  • 13
  • 26
  • Where did you defined `r` in `temp['region'] = r` referring to `# solution 2: multi processing ` – caot Oct 14 '19 at 00:28
  • my mistake, updated. – Jia Gao Oct 14 '19 at 00:34
  • http://idownvotedbecau.se/noexceptiondetails/, http://idownvotedbecau.se/nodebugging. Examine the state of the program at the place where the exception occurs. Judging by the message, a variable has a different type from what you are expecting. You are probably somehow botching the parameters passed to `b_a()` or its logic has a flaw. – ivan_pozdeev Oct 14 '19 at 01:56
  • Thanks to all of you for good advice, I know this question is not formulated in a great way, please leave a comment about where should I improve rather than just a downvote. – Jia Gao Oct 15 '19 at 01:22
  • @JasonGoal it will be great if you can provide a [mcve](/help/mcve) i.e. 5 rows of the input df(s) and the desired output. In this way we can better help you. A 6GB csv is not that big and eventually is possible to find a solution with vectorization. Eventuali `dask` or `vaex` could help too. – rpanai Oct 15 '19 at 01:58
  • @rpanai, I edited my question, the toy data has the exact structure as the data I'm working with, and much clear in the sense that I know what the correct output should be. It's not that I do not want to upload the real data, it's just time consuming to get a good subsample for testing purpose. Anyway, could you help to look at it and see if we can improve the performance (make it faster) by using either `multiprocessing` or `vectorization`? – Jia Gao Oct 15 '19 at 08:06
  • unless you fix a seed your example is not reproducible. – rpanai Oct 15 '19 at 11:11

3 Answers3

0

Change

out = P.starmap(b_a,zip(df,r_d_list))

into

out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list))

The output looks as the follows:

length of input dataframe: 300
region: B district: down
length of input dataframe: 300
region: C district: upper
length of sliced dataframe: 50
length of input dataframe: 300
region: C district: down
length of sliced dataframe: 50
length of sliced dataframe: 50
6
[  count_a count_b region row_id
0       6       7      A      0,   count_a count_b region row_id
0       2       4      A     50,   count_a count_b region row_id
0       1       4      B    100,   count_a count_b region row_id
0       7       4      B    150,   count_a count_b region row_id
0       4       9      C    200,   count_a count_b region row_id
0       4       4      C    250]

The array of df maintains references:

dfa = [df for i in range(len(r_d_list))]

for i in dfa:
    print(['id(i): ', id(i)])

The output of the above looks as the follows:

['id(i): ', 4427699200]
['id(i): ', 4427699200]
['id(i): ', 4427699200]
['id(i): ', 4427699200]
['id(i): ', 4427699200]
['id(i): ', 4427699200]

Difference between zip(df, r_d_list) and zip(dfa, r_d_list):

Review the example on zip at https://docs.python.org/3.3/library/functions.html#zip to understand what zip does and how it constructs the result.

list(zip(df, r_d_list)) returns the follows:

[
('value', ('A', 'upper')),
('district', ('A', 'down')),
('region', ('B', 'upper')),
('row_id', ('B', 'down'))
]

list(zip(dfa, r_d_list)) returns the follows:

[
(fa, ('A', 'upper')),
(fa, ('A', 'down')),
(fa, ('B', 'upper')),
(fa, ('B', 'down'))
]

Here you can find some example on pool.starmap at Python multiprocessing pool.map for multiple arguments.

Updated the working code:

import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pool

df = pd.DataFrame({'value': np.random.randint(0, 10, size=300),
                   'district': (['upper'] * 50 + ['down'] * 50) * 3,
                   'region': ['A'] * 100 + ['B'] * 100 + ['C'] * 100})

df['row_id'] = df.index

# b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])


# solution 2: multi processing
def b_a(input_df, r_d):
#    print('length of input dataframe: ' + str(len(input_df)))
#    print('region: ' + str(r_d[0]), 'district: ' + str(r_d[1]))

    sub_df = input_df.loc[(input_df['region'].isin([r_d[0]])) & (input_df['district'].isin([r_d[1]]))]  # subset data that in certain region and district

#    print('length of sliced dataframe: ' + str(len(sub_df)))

    b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])  # an empty data frame to store result

    for id in sub_df['row_id']:
        focal_value = sub_df.loc[sub_df['row_id'].isin([id])]['value']
        temp_b = sub_df.loc[
            (sub_df['value'] > (focal_value - 2).values[0]) & (sub_df['value'] < (focal_value.values[0]))]
        temp_a = sub_df.loc[
            (sub_df['value'] > (focal_value.values[0])) & (sub_df['value'] < (focal_value + 2).values[0])]

        if len(temp_a):
            temp_a['count_a'] = temp_a['row_id'].count()
        else:
            temp_a = temp_a.reindex(
                columns=[*temp_a.columns.tolist(), 'count_a'], fill_value=0)

        if len(temp_b):
            temp_b['count_b'] = temp_b['row_id'].count()
        else:
            temp_b = temp_b.reindex(
                columns=[*temp_b.columns.tolist(), 'count_b'], fill_value=0)

        temp_b.drop_duplicates('count_b', inplace=True)
        temp_a.drop_duplicates('count_a', inplace=True)
        temp = pd.concat([temp_b[['count_b']].reset_index(drop=True),
                          temp_a[['count_a']].reset_index(drop=True)], axis=1)

        temp['row_id'] = id
        temp['region'] = str(r_d[0])

        b_a = pd.concat([b_a, temp])

    return b_a


r_d_list = list(itertools.product(df['region'].unique(), df['district'].unique()))

# dfa = [df for i in range(len(r_d_list))]

#for i in dfa:
#    print(['id(i): ', id(i)])

if __name__ == '__main__':
    P = Pool(3)
    out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list))
    # print(len(out))
    P.close()
    P.join()

    final = pd.concat(out, ignore_index=True)

    print(final)

Output for final:

    count_a count_b region row_id
0         4       6      A      0
1         5       4      A      1
2       NaN       5      A      2
3         5       8      A      3
4         5     NaN      A      4
..      ...     ...    ...    ...
295       2       7      C    295
296       6     NaN      C    296
297       6       6      C    297
298       5       5      C    298
299       6       6      C    299

[300 rows x 4 columns]
caot
  • 3,066
  • 35
  • 37
  • thanks for your response, two questions regarding it: 1, the length of the final output should equal to the original `df` since I want to perform a row-wise operation on each row, but your answer produces a result with only 6 rows (should be 300), is this a problem due to the way you feed the `starmap` or due to the way I wrote the `b_a` function? – Jia Gao Oct 15 '19 at 01:30
  • 2, by using `[df for i in range(len(r_d_list))]`, are we simply duplicate the `df` for each set of parameters? If so, this may not be feasible in my situation since the real data is 6GB and has 500*150 sets of parameters(the length of r_d_list will be 500*150). – Jia Gao Oct 15 '19 at 01:30
  • The array maintains references to `df`. It doesn't duplicate the `df`. – caot Oct 15 '19 at 01:54
  • That's good news, any suggestion why the output is only 6 rows rather than 300 (length of `df`)? – Jia Gao Oct 15 '19 at 02:06
  • Thanks, this works great, would you mind adding some explanations about why `zip([df for i in range(len(r_d_list))], r_d_list)` works while `zip(df, r_d_list)` doesn't? I'll delete my naive ones from my question if so. – Jia Gao Oct 15 '19 at 08:01
0

I think here there is space for improvements. What I suggest you is to define a function within groupby

import os
import pandas as pd
import numpy as np
import dask.dataframe as dd
N = 30_000
# Now the example is reproducible
np.random.seed(0)
df = pd.DataFrame({'value': np.random.randint(0, 10, size=N),
                   'district': (['upper'] * 5 + ['down'] * 5) * 3000,
                   'region': ['A'] * 10_000 + ['B'] * 10_000 + ['C'] * 10_000,
                   'row_id': np.arange(N)})

The following function return count_a and count_b for every row within the given group

def fun(vec):
    out = []
    for i, v in enumerate(vec):
        a = vec[:i] + vec[i+1:]
        count_a = np.isin(a, [v-2, 2]).sum()
        count_b = np.isin(a, [v, v+2]).sum()
        out.append([count_a, count_b])
    return out

Pandas

%%time
df[["count_a", "count_b"]] = df.groupby(["district", "region"])["value"]\
                               .apply(lambda x: fun(x))\
                               .explode().apply(pd.Series)\
                               .reset_index(drop=True)
CPU times: user 22.6 s, sys: 174 ms, total: 22.8 s
Wall time: 22.8 s

Dask

Now you need to create again df and then you can use dask. Here is the first thing came to my mind. For sure there is a better/faster way.

ddf = dd.from_pandas(df, npartitions=os.cpu_count())

df[["count_a", "count_b"]] = ddf.groupby(["district", "region"])["value"]\
                                .apply(lambda x: fun(x.tolist()),
                                       meta=('x', 'f8'))\
                                .compute(scheduler='processes')\
                                .explode().apply(pd.Series)\
                                .reset_index(drop=True)
CPU times: user 6.92 s, sys: 114 ms, total: 7.04 s
Wall time: 13.4 s

Multiprocessing

In this case, again you need to create df. And here the trick is to split df to a list lst of dfs.

import multiprocessing as mp
def parallelize(fun, vec, cores):
    with mp.Pool(cores) as p:
        res = p.map(fun, vec)
    return res

def par_fun(d):
    d = d.reset_index(drop=True)
    o = pd.DataFrame(fun(d["value"].tolist()),
                     columns=["count_a", "count_b"])
    return pd.concat([d,o], axis=1)
%%time
lst = [l[1] for l in list(df.groupby(["district", "region"]))]

out = parallelize(par_fun, lst, os.cpu_count())
out = pd.concat(out, ignore_index=True)
CPU times: user 152 ms, sys: 49.7 ms, total: 202 ms
Wall time: 5 s

Eventually you can improve your function fun using numba.

rpanai
  • 12,515
  • 2
  • 42
  • 64
-2

Because of the GIL multiprocessing doesn't actually use two different threads. In a CPU bound process, using multiprocessing won't give you that much, if any, extra performance.

There is a library called dask which has an api designed to look just like pandas but under the hood it does a lot of asynchronous and chunking and what not to make handling bid dataframes faster.

FailureGod
  • 332
  • 1
  • 12
  • Are you saying that even I have a Linux machine with 48 cores, it's almost useless to parallel the job? If so, what's the meaning of `multiprocessing` in Python. Further, ` what not to make handling bid dataframes faster` does this mean that using `dask` can improve the performance or not? – Jia Gao Oct 14 '19 at 00:48
  • Lmao yeah I was rather miffed when I realized this too. There is currently no way to use more than one core in python without dropping down to the C layer or using subpricessing. – FailureGod Oct 14 '19 at 01:37
  • Didn't mean to send. But yeah. Dask uses C under the hood and can thus can fully use your whole CPU. Dask can also be used in with clusters and use multiple nodes. There's a bit of extra syntax to using it but that will be way easier than the alternatives. – FailureGod Oct 14 '19 at 01:39
  • 1
    @FailureGod aren't you confusing `multiprocessing` with `threading`? Different processes are, by definition, independent in which cores they run on, that's decided by the OS. – ivan_pozdeev Oct 14 '19 at 01:53
  • Also, how is this relevant? The OP's issue is that the multiprocessing-based solution gives them a strange exception. – ivan_pozdeev Oct 14 '19 at 01:54
  • The multiprocessing library does not execute code on different cores (because of the GIL). You are correct that my answer does not answer what OP asked. However, I think it will solve the core issue that they are having in the best way (for some definition of best). – FailureGod Oct 14 '19 at 02:09
  • @FailureGod, `multiprocessing` uses multiple python *processes*, the whole point is to allow you to leverage multiple cores and bypass the GIL – juanpa.arrivillaga Oct 14 '19 at 02:33