0

I have a loop that creates a new dataframe and writes it to a parquet file in S3. I'm doing a batch process and processing about 100,000 rows per iteration.

Once a dataframe is saved as a parquet file I don't need it memory anymore and I want to delete it and release the space but the commands I'm running below don't seem to open up memory and in every iteration my available memory goes down, and memory consumed goes up.

while True:
    #code snippet to pull data from source

    df_to_write = get_pandas_data_frame(col_name_data_type_map, records) #create dataframe
    wr.s3.to_parquet(df=df_to_write) #write to s3
    
    #check memory usage
    with_df = psutil.virtual_memory().available * 100 / psutil.virtual_memory().total
    print("memory with df: {}".format(with_df))
    print_memory_usage()
    
    #delete and release memory
    lst = [df_to_write]
    del df_to_write
    del lst
    gc.collect()
    df_to_write = pd.DataFrame() #saw an answer that said this can help with releasing memory (doesn't in my case)
    
    #check memory usage after deleting
    print_memory_usage()
    without_df = psutil.virtual_memory().available * 100 / psutil.virtual_memory().total
    print("memory without df: {}".format(without_df))


def get_pandas_data_frame(col_name_data_type_map, records):
    table_columns = [item[0] for item in col_name_data_type_map]
    df = pd.DataFrame(records, columns=table_columns)
    for item in col_name_data_type_map:
        if item[1]=='json':
            df[item[0]] = df[item[0]].astype(dtype=str) # Here jsonb type is converted to string format, once can flatten also.
    return df

def print_memory_usage():
    process = psutil.Process(os.getpid())
    memory_info = process.memory_info()
    print(f'Memory used: {memory_info.rss / 1024 ** 2} MB')

I referenced this question: Delete and release memory of a single pandas dataframe

The most common approach seems to be some combination of del df and gc.collect() but I'm seeing no affect on my end

Is there something else I should be doing?

abhinavkm
  • 11
  • 6
  • If you've already established that the data frame is deleted, might it be persisted in memory somewhere in your `s3.to_parquet` call? – ifly6 Aug 24 '23 at 04:05
  • @ifly6 that's what I thought too but even when I comment out that line, and run the code I don't see memory clearing up. I also removed the function call to get_pandas_data_frame() and just put that code in the main function, in case the dataframe was getting persisted on df = pd.DataFrame(records, columns=table_columns) but still no change. – abhinavkm Aug 24 '23 at 13:27

1 Answers1

0

It's not posted in the original problem statement but I had a couple print statements that were causing issues:

def main():
    db_name = DATABASE
    postgres_user = USER
    postgres_password = PASSWORD
    db_host = HOST

    all_records = [] # to track how many records I've processed
    while True:
        cur.itersize = 10000 # how much records to buffer on a client
        cur.execute("SELECT * FROM TABLE_NAME;")
        records = cur.fetchmany(10000)
        col_name_data_type_map = []
        with conn.cursor() as curs:
                        curs.execute("SELECT column_name, data_type FROM information_schema.columns WHERE table_name='{}';".format(TABLE_NAME))
            col_name_data_type_map = curs.fetchmany(10)
                
                print("creating dataframe")
        df = get_pandas_data_frame(col_name_data_type_map, records)
        print("writing data to S3")
        wr.s3.to_parquet(df=df, path=S3_PATH, dataset=True, sanitize_columns=False, mode="append")
        del df
        collected = gc.collect()
        print("garbage collected: {}".format(collected))
        print("processed {:,} records this loop".format(len(records)))
        all_records.extend(records) #I was saving the records here to track my total
        print("processed {:,} records total".format(len(all_records)))

Instead of doing

all_records = []
all_records.extend(records)

I'm doing:

all_records = 0
all_records += len(records)

An oversight on my part, I should've realized putting all these records into a list will still consume memory.

abhinavkm
  • 11
  • 6