My dataframe has million of lines, I have a function in python 3 pandas that proccess a dataframe :
def OpportunityDetector (df_Opportunity):
df_Opportunity['BillingCity_Det']=df_Opportunity['BillingCity'].apply(ISColFilled)
df_Opportunity['BillingPostalCode_Det']=df_Opportunity['BillingPostalCode'].apply(ISColFilled)
df_Opportunity['BillingStreet_Det']=df_Opportunity['BillingStreet'].apply(ISColFilled)
#1
df_Opportunity['BillingCity_Det_StageName_Det']=df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingCity_Det), axis=1)
#2
df_Opportunity['BillingPostalCode_Det_StageName_Det']=df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingPostalCode_Det), axis=1)
#3
df_Opportunity['BillingStreet_StageName_Det']=df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingStreet_Det), axis=1)
return df_Opportunity
How Can I use multiprocessing in order to parallelise my compute and get my result faster ?
`#somethinks like
df_new= Parrelelise_Function(My_Pandas_dataframe,OpportunityDetector)
I have tried this but it is not working:
from multiprocessing import Pool
def parallelize_dataframe(df, func, n_cores=7):
df_split = np.array_split(df, n_cores) # on divise la donnée en n_cores
pool = Pool(n_cores)
df = pd.concat(pool.map(func, df_split),ignore_index=True)
pool.close()
pool.join()
return df
df_new = parallelize_dataframe(df_OpportunityA, OpportunityDetector)
I have this error :
PicklingError: Can't pickle <function OpportunityDetector at 0x7f7e51db1bf8>: attribute lookup OpportunityDetector on __main__ failed
---------------------------------------------------------------------------
PicklingError Traceback (most recent call last)
<command-2041881674588859> in <module>
----> 1 trainpportunityA = parallelize_dataframe(df_OpportunityA, OpportunityDetector)
<command-2041881674588850> in parallelize_dataframe(df, func, n_cores)
2 df_split = np.array_split(df, n_cores) # on divise la donnée en n_cores
3 pool = Pool(n_cores) # lancement du traintement
----> 4 df = pd.concat(pool.map(func, df_split),ignore_index=True) #on rassemble toutes les données
5 pool.close() #fin du traitement
6 pool.join() # on join les données
/usr/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
266 in a list that is returned.
267 '''
--> 268 return self._map_async(func, iterable, mapstar, chunksize).get()
269
270 def starmap(self, func, iterable, chunksize=None):
/usr/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
655 return self._value
Thanks in advance