I have 6 csv files average 10G size
I run 6 processes (xargs) through shell 1 for each file
merchants () {
echo -n merchant_1 merchant_2 merchant_2 merchant_4 merchnat_5 merchant_6
}
merchants | xargs -L1 -P6 -d' ' -I{} bash -c \"python main.py '{}' || true\
My code has the following flow. (for 1 file)
- Initiate a pool of 2 process # (so for 6 files = 12 processes)
- Read csv in chunk (pandas) (200,000) rows. # 1 chunk avg = 2gb (so for 12 processes = 24gb)
- pass this chunk to a
pool.async(process, (df, transformer, output_filename))
The process function takes the dataframe, create a transformer object (transformer is a class which has 12 - 15 methods. Each method is responsible to perform a transformation (e.g remove certain blacklist words, rename columns, lowercase, etc) and then return the transformed dataframe. Then write the dataframe to a new csv file. So in result i get mutiple new csv files (200,000 rows each)
Now the problem is the memory usage.
As per above calculation 24gb (already with a bit of buffer) should be used.
The machine has 64 gb of memory and 32 processors. Since i run multiprocessing i see process running varying from 10-20 through htop.
But it gradually increases the memory and it reaches the whole 60GB memory and every thing get killed. There are two possible reasons which can lead to this high memory use.
- My code has some problem in the transformation logic (which i don't see until now)
- The mechanism how i am using the multi processing is not correct.
Here is the related code
transformer = globals()[merchant['transformer']] # i take the class name from configuration file
pool = Pool(processes=merchant['num_of_process']) # 2
chunk_index = 1
for df in pd.read_csv(downloaded_file, chunksize=chunksize, compression='gzip', skipinitialspace=True, encoding='utf-8'):
output_file_name = output_path + \
merchant['output_file_format'].format(
file_index, chunk_index)
pool.apply_async(process, (df, transformer, output_file_name))
chunk_index += 1
Processing
def process(df, transformer, output_file_name):
"""
Process the csv chunk in a separate thread
:param df:
:param transformer:
:param file_index:
:param chunk_index:
"""
with TransformerContext(transformer(df)) as transformer:
methods = [method for method in dir(transformer) if callable(
getattr(transformer, method)) if not method.startswith('_')]
for method in methods:
getattr(transformer, method)()
handle = open(output_file_name, "w")
transformer.df.to_csv(output_file_name, index=False,
compression='gzip', sep='\t', quoting=1, encoding='utf-8')
del df
del transformer
handle.close()
return True
The reason I am using a context here is i want to delete columns which are not necessary before writing to csv. Since it has to run for different type of files this will serve as a generic code.
class TransformerContext:
def __init__(self, transformer):
self.transformer = transformer
self.original_columns = list(transformer.df.columns)
def __enter__(self):
return self.transformer
def __exit__(self, exc_type, exc_val, exc_tb):
processed_columns = list(self.transformer.df.columns)
for column in self.original_columns:
if column in processed_columns:
del self.transformer.df[column]
I need help in identifying / fixing if there is something related to multiprocessing here. If not then i will go to check my logic code.
Thanks