2

I am dealing with large data tables, 100M+ rows. On some number of columns I need to perform a regex replace for a number of terms. I pre-compiled and store all terms in a dictionary for their use. User selects the columns to be scrubbed. After replacements the data is then saved to a different csv file.

I have a solution for tables that fit in memory, but is not multiprocessor enabled, so it is using only one core.

I'd like to shift this to multiprocessor to gain those benefits. The most relevant sections of my code are as follows:

def SendToFile(write_df):
    if i == 0:
        write_df.to_csv(writename, mode='w', index=None)            
    else:
        write_df.to_csv(writename, mode='a', index=None)
    return 1

def CleanTheChunk(clean_df):
    df=clean_df.copy()
    for elem in clean_col_index:
            col_name=raw_cols[elem]
            df[col_name].replace(scrub_comp, regex=True, inplace = True)
    return df

###
#read in data, pre-compile regex terms, select columns to scrub of terms etc.
###

if large_data==0:
    #read in the data
    df = pd.read_csv(filename, dtype='str')

    #clean the file in every column indicated:
    for elem in clean_col_index:
        col_name=raw_cols[elem]
        df[col_name].replace(scrub_comp, regex=True, inplace = True)
    #Save the cleaned version to file
    df.to_csv(writename, index=None)

else: #This is how it handles if it was large data
    i=0 #i is used to identify when the first chunk was written 'w' or 'a'   
    #read the file in chunks
    for chunk in pd.read_csv(filename, chunksize=csize, dtype='str'):

        #Clean the file:
        chunk = CleanTheChunk(chunk)
        #Save the file
        i=SendToFile(chunk)
print("Jobs done.")

Rows do not affect one another, but they do need to be saved to the new csv in the correct order. I just can't wrap my brain around how to read in multiple chunks, get them processed in parallel, and then written to the new csv in the right order.

UPDATE I've tried a new method. I've collapsed all of the logic into a single function, that function then gets called to map. I'll try to make the function shorter to get to the error I'm running into now.

def MP_Cleaner(chunk):
    #read in the banned terms
    #Add escape characters to any control characters in the baned terms
    #Create the regex pattern
    #Iterate over the columns that need scrubbing
    #use chunk['col_name'].replace(regexterm, regex=true, inplace=true)
    return chunk

def parallelize(data, func):
    data_split = np.array_split(data, cores)
    pool = Pool(cores)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data


df = pd.read_csv(filename, dtype='str')
if __name__ == '__main__':
    df_done=parallelize(df, MP_Cleaner)
    df_done.to_csv(writename, index=None)

    #That is it, all processing done nd file hould be saved
    print("Job Complete, "+ writename + " saved.")
    stop_time = time.strftime("%m/%d/%Y, %H:%M:%S", time.localtime() )
    print("Start time: " + start_time)
    print(" Stop time: " + stop_time)
    proceed=input("Press Enter to exit:")
    print(proceed)

I'm getting an Attribute error: 'list' object has no attribute 'replace'

JakeB
  • 21
  • 2
  • I attempted a Dask based solution doing the following: ``` our_data = ddf.from_pandas(pd.read_csv(filename, dtype='str'), npartitions=10) our_data=our_data.map_partitions(CleanTheData, meta=df) writename="CLEANED_"+filename our_data.to_csv(writename, single_file = True, index=None) ``` I saw no difference in cpu usage, RAM increased quite a bit, took 34 mins on a 3M recordset vs 30 for original implementation – JakeB Feb 02 '20 at 17:16
  • Have attemped the same code in both Spyder (run to terminal window), and Jupyter Notebooks. Code is actually faster in Jupyter, but in neither does CPU usage exceed 15% overall (it is maxing one thread only) – JakeB Feb 03 '20 at 02:04
  • Does [this](https://stackoverflow.com/questions/36794433/python-using-multiprocessing-on-a-pandas-dataframe) help? – S.Au.Ra.B.H Feb 03 '20 at 07:14
  • Somewhat, I found out my major problem was I needed to put the full logic in a single function. One I did that I just played around with optimization. It will do a 12GB, 100M record scrub of a single column in about 5 minutes. – JakeB Feb 04 '20 at 02:59

1 Answers1

0

Figured it out. I also used some code from a bunch of places. Speed up millions of regex replacements in Python 3

and

http://blog.adeel.io/2016/11/06/parallelize-pandas-map-or-apply/

Final write up in case anyone has a similar problem to address:

Only works for files that fit in RAM, still have to make it good enough for files too large for ram without giving up any of the benefits.

import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import re
from multiprocessing import Pool
from trie import Trie

#Enter the filename of the csv to be scrubbed
#After processing the output will be have the prefix "CLEANED_" added to the 
#    filename provided
filename = "longest-2019-Oct.csv"

#How many cores to use, make sure you save one for overhead. The entire file
#    must fit in RAM for this method to work
cores = 9

#This is the file name for the scrubterms, that file must be in the same 
#  directory as this script. It must be a single column whose name is "items"
scrubfile="terms.csv"

#Enter the desired term to cover redactions, default is XXX
redact = "XXX"

#Columns to clean, they must be typed correctly, in "", seperated by commas
# to remove the columns earth, wind, and fire it would be
#  ["earth", "wind", "fire"]
cols=["massive"]

#***************DO NOT CHANGE ANYTHING BELOW THIS LINE*************************
writename="CLEANED_"+filename

def trie_regex_from_words(words):
    trie = Trie()
    for word in words:
        trie.add(word)
    return re.compile(r"\b" + trie.pattern() + r"\b", re.IGNORECASE)

#read in the terms to be cleaned

def MP_Cleaner(chunk):
    #read in the terms
    scrub_df= pd.read_csv(scrubfile, dtype='str')
    #Pull just the items
    my_scrub=scrub_df['items'].tolist()

    #The chars we must protect
    SpecialCharacters = [chr(92), chr(46), chr(94), chr(36), chr(42), chr(43), 
                         chr(63), chr(123), chr(125), chr(91), chr(93), 
                         chr(124), chr(40), chr(41), chr(34), chr(39)]

    #walk through terms and replace special characters with the escpae 
    #    character so they can be processed in regex properly
    for i in range(len(SpecialCharacters)):
        replacement = chr(92) + SpecialCharacters[i]
        my_scrub = [term.replace(SpecialCharacters[i], replacement ) for term in my_scrub]

    Trie_Scrub = trie_regex_from_words(my_scrub)

    for elem in cols:
        chunk[elem].replace(Trie_Scrub, value=redact, regex=True, inplace = True)

    return chunk

def parallelize(data, func):
    data_split = np.array_split(data, cores)
    pool = Pool(cores)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

start_time=time.strftime("%m/%d/%Y, %H:%M:%S", time.localtime() )

df = pd.read_csv(filename, dtype='str')
if __name__ == '__main__':
    df_done = parallelize(df, MP_Cleaner)
    df_done.to_csv(writename, index=None)

    #That is it, all processing done nd file hould be saved
    print("Job Complete, "+ writename + " saved.")
    stop_time = time.strftime("%m/%d/%Y, %H:%M:%S", time.localtime() )
    print("Start time: " + start_time)
    print(" Stop time: " + stop_time)
    proceed=input("Press Enter then close the window to exit:")
    print(proceed)
JakeB
  • 21
  • 2