I tried out the pool.map approach given in similar answers but I ended up with 8 files of 23G each which is worse.
import os
import pandas as pd
from tqdm import tqdm
from multiprocessing import Pool
#csv file name to be read in
in_csv = 'mycsv53G.csv'
#get the number of lines of the csv file to be read
number_lines = sum(1 for row in tqdm((open(in_csv, encoding = 'latin1')), desc = 'Reading number of lines....'))
print (number_lines)
#size of rows of data to write to the CSV,
#you can change the row size according to your need
rowsize = 11367260 #decided based on your CPU core count
#start looping through data writing it to a new file for each set
def reading_csv(filename):
for i in tqdm(range(1,number_lines,rowsize), desc = 'Reading CSVs...'):
print ('in reading csv')
df = pd.read_csv(in_csv, encoding='latin1',
low_memory=False,
header=None,
nrows = rowsize,#number of rows to read at each loop
skiprows = i)#skip rows that have been read
#csv to write data to a new file with indexed name. input_1.csv etc.
out_csv = './csvlist/input' + str(i) + '.csv'
df.to_csv(out_csv,
index=False,
header=False,
mode='a',#append data to csv file
chunksize=rowsize)#size of data to append for each loop
def main():
# get a list of file names
files = os.listdir('./csvlist')
file_list = [filename for filename in tqdm(files) if filename.split('.')[1]=='csv']
# set up your pool
with Pool(processes=8) as pool: # or whatever your hardware can support
print ('in Pool')
# have your pool map the file names to dataframes
try:
df_list = pool.map(reading_csv, file_list)
except Exception as e:
print (e)
if __name__ == '__main__':
main()
The above approach took 4 hours to split the files in a concurrent fashion and then parsing every CSV will be even more.. not sure if multiprocessing helped or not!
Currently, I read the CSV file through this code:
import pandas as pd
import datetime
import numpy as np
for chunk in dd.read_csv(filename, chunksize = 10**5, encoding='latin-1', skiprows=1, header=None):
#chunk processing
final_df = final_df.append(agg, ignore_index=True)
final_df.to_csv('Final_Output/'+output_name, encoding='utf-8', index=False)
It takes close to 12 hours to process the large CSV at once.
What will the improvements here be? Any suggestions? I am willing to try out dask now.. but no other options left.