0

I have 6 csv files average 10G size

I run 6 processes (xargs) through shell 1 for each file

merchants () {
    echo -n merchant_1 merchant_2 merchant_2 merchant_4 merchnat_5 merchant_6
}
merchants | xargs -L1 -P6 -d' ' -I{} bash -c \"python main.py '{}' || true\

My code has the following flow. (for 1 file)

  • Initiate a pool of 2 process # (so for 6 files = 12 processes)
  • Read csv in chunk (pandas) (200,000) rows. # 1 chunk avg = 2gb (so for 12 processes = 24gb)
  • pass this chunk to a pool.async(process, (df, transformer, output_filename))

The process function takes the dataframe, create a transformer object (transformer is a class which has 12 - 15 methods. Each method is responsible to perform a transformation (e.g remove certain blacklist words, rename columns, lowercase, etc) and then return the transformed dataframe. Then write the dataframe to a new csv file. So in result i get mutiple new csv files (200,000 rows each)

Now the problem is the memory usage.

As per above calculation 24gb (already with a bit of buffer) should be used.

The machine has 64 gb of memory and 32 processors. Since i run multiprocessing i see process running varying from 10-20 through htop.

But it gradually increases the memory and it reaches the whole 60GB memory and every thing get killed. There are two possible reasons which can lead to this high memory use.

  1. My code has some problem in the transformation logic (which i don't see until now)
  2. The mechanism how i am using the multi processing is not correct.

Here is the related code

transformer = globals()[merchant['transformer']] # i take the class name from configuration file
            pool = Pool(processes=merchant['num_of_process']) # 2
            chunk_index = 1

            for df in pd.read_csv(downloaded_file, chunksize=chunksize, compression='gzip', skipinitialspace=True, encoding='utf-8'):
                output_file_name = output_path + \
                    merchant['output_file_format'].format(
                        file_index, chunk_index)
                pool.apply_async(process, (df, transformer, output_file_name))
                chunk_index += 1

Processing

def process(df, transformer, output_file_name):
    """
    Process the csv chunk in a separate thread
        :param df:
        :param transformer:
        :param file_index:
        :param chunk_index:
    """
    with TransformerContext(transformer(df)) as transformer:
        methods = [method for method in dir(transformer) if callable(
            getattr(transformer, method)) if not method.startswith('_')]
        for method in methods:
            getattr(transformer, method)()
    handle = open(output_file_name, "w")
    transformer.df.to_csv(output_file_name, index=False,
                          compression='gzip', sep='\t', quoting=1, encoding='utf-8')
    del df
    del transformer
    handle.close()

    return True

The reason I am using a context here is i want to delete columns which are not necessary before writing to csv. Since it has to run for different type of files this will serve as a generic code.

class TransformerContext:
    def __init__(self, transformer):
        self.transformer = transformer
        self.original_columns = list(transformer.df.columns)

    def __enter__(self):
        return self.transformer

    def __exit__(self, exc_type, exc_val, exc_tb):
        processed_columns = list(self.transformer.df.columns)
        for column in self.original_columns:
            if column in processed_columns:
                del self.transformer.df[column]

I need help in identifying / fixing if there is something related to multiprocessing here. If not then i will go to check my logic code.

Thanks

Raheel
  • 8,716
  • 9
  • 60
  • 102
  • See if you are loading all of your `csv` data into memory and then processing it. If you haven't yet, you could consider using [generators](https://wiki.python.org/moin/Generators), which allow you to lazily evaluate such big data. – srikavineehari Dec 15 '17 at 12:10
  • I read on different answers that pandas `read_csv` with `chunksize` does this for us. Although it retusn the `TextReader` object but i think its a generator. – Raheel Dec 15 '17 at 12:17

0 Answers0