0

I am trying to do parallel processing in python. I have a huge dataframe with more than 4M rows. So as a sample given below, I would like to divide the dataframe(df will be divided into df1,df2) apply the same set of transpose operations on the different resultant dataframes. Thanks to Jezrael for helping me reach upto this level.Please find below my input dataframe

df = pd.DataFrame({
'subject_id':[1,1,1,1,2,2,2,2,3,3,4,4,4,4,4],
'readings' : ['READ_1','READ_2','READ_1','READ_3','READ_1','READ_5','READ_6','READ_8','READ_10','READ_12','READ_11','READ_14','READ_09','READ_08','READ_07'],
'val' :[5,6,7,11,5,7,16,12,13,56,32,13,45,43,46],
})

code to divide the dataframe

N=2  # dividing into two dataframes.
dfs = [x for _,x in df.groupby(pd.factorize(df['subject_id'])[0] // N)] # dfs is an iterable which will have two dataframes

parallel processing code

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
results = []

def transpose_ope(df):                      #this function does the transformation like I want
    df_op = (df.groupby(['subject_id','readings'])['val']
            .describe()
            .unstack()
            .swaplevel(0,1,axis=1)
            .reindex(df['readings'].unique(), axis=1, level=0))
    df_op.columns = df_op.columns.map('_'.join)
    df_op = df_op.reset_index()

results.append(pool.map(transpose_ope, [df for df in dfs])) # am I storing the output correctly here?

Actually, I would like to append the output from each stage to a main dataframe.

Can you help me do this? My code keeps running even for just some 10-15 records.

The Great
  • 7,215
  • 7
  • 40
  • 128
  • 1
    Not an direct answer for your question, but have you try to use: https://github.com/nalepae/pandarallel – Quant Christo Oct 20 '19 at 10:35
  • @QuantChristo - Unfortunately, I am in windows right now. Looks like it works only with Linux and Macos – The Great Oct 20 '19 at 10:49
  • @QuantChristo - Hi, Have you tried using this? When I tried one of the examples given in their docs, it still keeps running for `30000000 rows × 2 columns`. May I know for a dataframe of this size, how long should this take using `pandarallel`? – The Great Oct 21 '19 at 06:18
  • I've used it about 2 months ago, simple `applyparallel` and it worked fine, but it also had to be run on windows machine so I need to stop using it. There is also https://github.com/modin-project/modin , to check it you need only `import modin.pandas as pd` – Quant Christo Oct 21 '19 at 06:34
  • Did you try their example and was it fast? like within 2-3 mins? – The Great Oct 21 '19 at 06:40
  • I did't check their examples.You can try to run for 3k, 30k rows and so on to see how it behaves. It could be a bug https://github.com/nalepae/pandarallel/issues/39 – Quant Christo Oct 21 '19 at 06:48

1 Answers1

1

The function you use in map needs to return the object you want.

I would also use the more idiomatic context manager available for pool.

EDIT: Fixed import

import multiprocessing as mp

def transpose_ope(df):                      #this function does the transformation like I want
    df_op = (df.groupby(['subject_id','readings'])['val']
            .describe()
            .unstack()
            .swaplevel(0,1,axis=1)
            .reindex(df['readings'].unique(), axis=1, level=0))
    df_op.columns = df_op.columns.map('_'.join)
    df_op = df_op.reset_index()
    return df_op


def main():

    with mp.Pool(mp.cpu_count()) as pool:
        res = pool.map(transpose_ope, [df for df in dfs])

if __name__=='__main__':
   main()

Not sure why you're appending a single list to another list...but if you just want a final list of [transformed(df) for df in dfs], map returns just that.

ec2604
  • 501
  • 3
  • 11
  • Okay, I guess it's `mp.Pool`. – The Great Oct 20 '19 at 10:51
  • Actually did you try running this at your end? It is still running for the sample dataframe. – The Great Oct 20 '19 at 10:53
  • Might be a main issue, depends a bit on your OS, I'll edit it into a __main__ and main(), see if it works. – ec2604 Oct 20 '19 at 10:59
  • Is there any issue with code, I mean the `transpose_ope` function? – The Great Oct 20 '19 at 11:00
  • Works fine on my end. – ec2604 Oct 20 '19 at 11:01
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/201157/discussion-between-ec2604-and-ssmk). – ec2604 Oct 20 '19 at 11:02
  • OP was using Jupyter Notebook + Windows. Recommended reading: https://stackoverflow.com/questions/20222534/python-multiprocessing-on-windows-if-name-main https://stackoverflow.com/questions/23641475/multiprocessing-working-in-python-but-not-in-ipython/23641560#23641560 – ec2604 Oct 20 '19 at 11:28