I am dealing with large data tables, 100M+ rows. On some number of columns I need to perform a regex replace for a number of terms. I pre-compiled and store all terms in a dictionary for their use. User selects the columns to be scrubbed. After replacements the data is then saved to a different csv file.
I have a solution for tables that fit in memory, but is not multiprocessor enabled, so it is using only one core.
I'd like to shift this to multiprocessor to gain those benefits. The most relevant sections of my code are as follows:
def SendToFile(write_df):
if i == 0:
write_df.to_csv(writename, mode='w', index=None)
else:
write_df.to_csv(writename, mode='a', index=None)
return 1
def CleanTheChunk(clean_df):
df=clean_df.copy()
for elem in clean_col_index:
col_name=raw_cols[elem]
df[col_name].replace(scrub_comp, regex=True, inplace = True)
return df
###
#read in data, pre-compile regex terms, select columns to scrub of terms etc.
###
if large_data==0:
#read in the data
df = pd.read_csv(filename, dtype='str')
#clean the file in every column indicated:
for elem in clean_col_index:
col_name=raw_cols[elem]
df[col_name].replace(scrub_comp, regex=True, inplace = True)
#Save the cleaned version to file
df.to_csv(writename, index=None)
else: #This is how it handles if it was large data
i=0 #i is used to identify when the first chunk was written 'w' or 'a'
#read the file in chunks
for chunk in pd.read_csv(filename, chunksize=csize, dtype='str'):
#Clean the file:
chunk = CleanTheChunk(chunk)
#Save the file
i=SendToFile(chunk)
print("Jobs done.")
Rows do not affect one another, but they do need to be saved to the new csv in the correct order. I just can't wrap my brain around how to read in multiple chunks, get them processed in parallel, and then written to the new csv in the right order.
UPDATE I've tried a new method. I've collapsed all of the logic into a single function, that function then gets called to map. I'll try to make the function shorter to get to the error I'm running into now.
def MP_Cleaner(chunk):
#read in the banned terms
#Add escape characters to any control characters in the baned terms
#Create the regex pattern
#Iterate over the columns that need scrubbing
#use chunk['col_name'].replace(regexterm, regex=true, inplace=true)
return chunk
def parallelize(data, func):
data_split = np.array_split(data, cores)
pool = Pool(cores)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
df = pd.read_csv(filename, dtype='str')
if __name__ == '__main__':
df_done=parallelize(df, MP_Cleaner)
df_done.to_csv(writename, index=None)
#That is it, all processing done nd file hould be saved
print("Job Complete, "+ writename + " saved.")
stop_time = time.strftime("%m/%d/%Y, %H:%M:%S", time.localtime() )
print("Start time: " + start_time)
print(" Stop time: " + stop_time)
proceed=input("Press Enter to exit:")
print(proceed)
I'm getting an Attribute error: 'list' object has no attribute 'replace'