1

My dataframe has million of lines, I have a function in python 3 pandas that proccess a dataframe :

def OpportunityDetector (df_Opportunity):
    
    df_Opportunity['BillingCity_Det']=df_Opportunity['BillingCity'].apply(ISColFilled)                                                                           
    df_Opportunity['BillingPostalCode_Det']=df_Opportunity['BillingPostalCode'].apply(ISColFilled)                                                                           
    df_Opportunity['BillingStreet_Det']=df_Opportunity['BillingStreet'].apply(ISColFilled)                                                                           
    
    #1
    df_Opportunity['BillingCity_Det_StageName_Det']=df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingCity_Det), axis=1)
    #2
    df_Opportunity['BillingPostalCode_Det_StageName_Det']=df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingPostalCode_Det), axis=1)
    #3
    df_Opportunity['BillingStreet_StageName_Det']=df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingStreet_Det), axis=1)
     
    return df_Opportunity

How Can I use multiprocessing in order to parallelise my compute and get my result faster ?

`#somethinks like 

df_new= Parrelelise_Function(My_Pandas_dataframe,OpportunityDetector)

I have tried this but it is not working:

from multiprocessing import  Pool

def parallelize_dataframe(df, func, n_cores=7): 
    df_split = np.array_split(df, n_cores) # on divise la donnée en n_cores
    pool = Pool(n_cores)  
    df = pd.concat(pool.map(func, df_split),ignore_index=True) 
    pool.close()
    pool.join() 
    return df 


df_new = parallelize_dataframe(df_OpportunityA, OpportunityDetector)

I have this error :

PicklingError: Can't pickle <function OpportunityDetector at 0x7f7e51db1bf8>: attribute lookup OpportunityDetector on __main__ failed
---------------------------------------------------------------------------
PicklingError                             Traceback (most recent call last)
<command-2041881674588859> in <module>
----> 1 trainpportunityA = parallelize_dataframe(df_OpportunityA, OpportunityDetector)

<command-2041881674588850> in parallelize_dataframe(df, func, n_cores)
      2     df_split = np.array_split(df, n_cores) # on divise la donnée en n_cores
      3     pool = Pool(n_cores) # lancement du traintement
----> 4     df = pd.concat(pool.map(func, df_split),ignore_index=True) #on rassemble toutes les données
      5     pool.close() #fin du traitement
      6     pool.join() # on join les données

/usr/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    266         in a list that is returned.
    267         '''
--> 268         return self._map_async(func, iterable, mapstar, chunksize).get()
    269 
    270     def starmap(self, func, iterable, chunksize=None):

/usr/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    655             return self._value

Thanks in advance

hugo
  • 441
  • 5
  • 25

0 Answers0