4

I have a huge dataset with contents such as given below:

+------+------------------------------------------------------------------+----------------------------------+--+
| HHID |                             VAL_CD64                             |             VAL_CD32             |  |
+------+------------------------------------------------------------------+----------------------------------+--+
|  203 | 8c5bfd9b6755ffcdb85dc52a701120e0876640b69b2df0a314dc9e7c2f8f58a5 | 373aeda34c0b4ab91a02ecf55af58e15 |  |
|  203 | 0511dc19cb09f8f4ba3d140754dafb1471dacdbb6747cdb5a2bc38e278d229c8 | 6f3606577eadacef1b956307558a1efd |  |
|  203 | a18adc1bcae1b570a610b13565b82e5647f05fef8a4680bd6ccdd717cdd34af7 | 332321ab150879e930869c15b1d10c83 |  |
|  720 | f6c581becbac4ec1291dc4b9ce566334b1cb2c85e234e489e7fd5e1393bd8751 | 2c4f97a04f02db5a36a85f48dab39b5b |  |
|  720 | abad845107a699f5f99575f8ed43e0440d87a8fc7229c1a1db67793561f0f1c3 | 2111293e946703652070968b224875c9 |  |
|  348 | 25c7cf022e6651394fa5876814a05b8e593d8c7f29846117b8718c3dd951e496 | 5c80a555fcda02d028fc60afa29c4a40 |  |
|  348 | 67d9c0a4bb98900809bcfab1f50bef72b30886a7b48ff0e9eccf951ef06542f9 | 6c10cd11b805fa57d2ca36df91654576 |  |
|  348 | 05f1e412e7765c4b54a9acfd70741af545564f6fdfe48b073bfd3114640f5e37 | 6040b29107adf1a41c4f5964e0ff6dcb |  |
|  403 | 3e8da3d63c51434bcd368d6829c7cee490170afc32b5137be8e93e7d02315636 | 71a91c4768bd314f3c9dc74e9c7937e8 |  |
+------+------------------------------------------------------------------+----------------------------------+--+

I'm processing the file in order to have output in below given format:

+------+------------------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+----------------------------------+----------------------------------+----------------------------------+--+
| HHID |                            VAL1_CD64                             |                            VAL2_CD64                             |                            VAL3_CD64                             |            VAL1_CD32             |            VAL2_CD32             |            VAL3_CD32             |  |
+------+------------------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+----------------------------------+----------------------------------+----------------------------------+--+
|  203 | 8c5bfd9b6755ffcdb85dc52a701120e0876640b69b2df0a314dc9e7c2f8f58a5 | 0511dc19cb09f8f4ba3d140754dafb1471dacdbb6747cdb5a2bc38e278d229c8 | a18adc1bcae1b570a610b13565b82e5647f05fef8a4680bd6ccdd717cdd34af7 | 373aeda34c0b4ab91a02ecf55af58e15 | 6f3606577eadacef1b956307558a1efd | 332321ab150879e930869c15b1d10c83 |  |
|  720 | f6c581becbac4ec1291dc4b9ce566334b1cb2c85e234e489e7fd5e1393bd8751 | abad845107a699f5f99575f8ed43e0440d87a8fc7229c1a1db67793561f0f1c3 |                                                                  | 2c4f97a04f02db5a36a85f48dab39b5b | 2111293e946703652070968b224875c9 |                                  |  |
|  348 | 25c7cf022e6651394fa5876814a05b8e593d8c7f29846117b8718c3dd951e496 | 67d9c0a4bb98900809bcfab1f50bef72b30886a7b48ff0e9eccf951ef06542f9 | 05f1e412e7765c4b54a9acfd70741af545564f6fdfe48b073bfd3114640f5e37 | 5c80a555fcda02d028fc60afa29c4a40 | 6c10cd11b805fa57d2ca36df91654576 | 6040b29107adf1a41c4f5964e0ff6dcb |  |
|  403 | 3e8da3d63c51434bcd368d6829c7cee490170afc32b5137be8e93e7d02315636 |                                                                  |                                                                  | 71a91c4768bd314f3c9dc74e9c7937e8 |                                  |                                  |  |
+------+------------------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+----------------------------------+----------------------------------+----------------------------------+--+

My current code is:

import pandas as pd
import numpy as np
import os
import shutil
import glob
import time

start=time.time()

print('\nFile Processing Started\n')

path=r'C:\Users\xyz\Sample Data'
input_file=r'C:\Users\xyz\Sample Data\test'
output_file=r'C:\Users\xyz\Sample Data\test_MOD'
chunk=pd.read_csv(input_file+'.psv',sep='|',chunksize=10000,dtype={"HH_ID":"string","VAL_CD64":"string","VAL_CD32":"string"})

chunk_list=[]
for c_no in chunk:
    chunk_list.append(c_no)

file_no=1
rec_cnt=0
for i in chunk_list:
    start2=time.time()
    rec_cnt=rec_cnt+len(i)
    rec_cnt2=0
    rec_cnt2=len(i)
    df=pd.DataFrame(i)
    df_ = df.groupby('HH_ID').agg({'VAL_CD64': list, 'VAL_CD32': list})
    
    data = []
    for col in df_.columns:
        d = pd.DataFrame(df_[col].values.tolist(), index=df_.index)
        d.columns = [f'{col}_{i}' for i in map(str, range(1, len(d.columns)+1))]
        data.append(d)
    
    res = pd.concat(data, axis=1)
    # res.columns=['MAID1_SHA256', 'MAID2_SHA256', 'MAID3_SHA256', 'MAID1_MD5','MAID2_MD5', 'MAID3_MD5']
    res.to_csv(output_file+str(file_no)+'.psv',index=True,sep='|')
    
    with open(output_file+str(file_no)+'.psv','r') as istr:
        with open(input_file+str(file_no)+'.psv','w') as ostr:
            for line in istr:
                line=line.strip('\n')+'|'
                print(line,file=ostr)
    os.remove(output_file+str(file_no)+'.psv')
    file_no+=1
    end2=time.time()
    duration2=end2-start2
    print("\nProcessed "+ str(rec_cnt2)+ " records in "+ str(round((duration2),2))+ " seconds. \nTotal Processed Records: "+str(rec_cnt))

os.remove(input_file+'.psv')
allFiles = glob.glob(path + "/*.psv")
allFiles.sort() 
with open(os.path.join(path,'someoutputfile.csv'), 'wb') as outfile:
    for i, fname in enumerate(allFiles):
        with open(fname, 'rb') as infile:
            if i != 0:
                infile.readline()  
            shutil.copyfileobj(infile, outfile)

test=os.listdir(path)
for item in test:
    if item.endswith(".psv"):
        os.remove(os.path.join(path,item))

final_file_name=input_file+'.psv'

os.rename(os.path.join(path,'someoutputfile.csv'),final_file_name)

end=time.time()
duration=end-start
print("\n"+ str(rec_cnt)+ " records added in "+ str(round((duration),2))+ " seconds. \n")

However, this code is taking a lot of time to process a 400 million records file, approx 18-19 hours, running on unix. And the whole script gets killed if I try to process a 700 million records file. By my google search, I believe it is being killed due to high memory usage of groupby function.

Is there any way I can reduce the memory footprint of this program, so that a 700 million file can be processed through it?

Abhinav Dhiman
  • 745
  • 3
  • 17
  • 2
    Have you tried data.dataframe - https://stackoverflow.com/questions/50051210/avoiding-memory-issues-for-groupby-on-large-pandas-dataframe or the "observed=True" option in the groupby function? – Tom McLean Apr 28 '21 at 11:25
  • Will try this, and update. Thanks. – Abhinav Dhiman Apr 28 '21 at 11:38
  • This doesn't help. Memory usage for both cases, my original code and using dask dataframe is around 23 MB (calculated using tracemalloc) for a 16 MB file (100000 rows). – Abhinav Dhiman Apr 28 '21 at 13:22

1 Answers1

0

I'm not sure how to do it with pandas, but you can do this without ever keeping more than a few rows in memory.

First, make sure the dataset is sorted by the column you want to group by. If not, sort them using an external merge sort algorithm.

Then, just follow this simple algorithm

  • read the first HHID, and start a new list of VAL_CD64 and VAL_CD32
  • while there are more lines
    • read the next line
    • if the HHID is the same as the previous, add VAL_CD64 and VAL_CD32 to the current lists
    • else
      • write out the previous HHID and cumulated values,
      • start collecting a new list for the new HHID
  • write out the last HHID and cumulated values
GeertPt
  • 16,398
  • 2
  • 37
  • 61