Currently, I have managed to solve this but it is slower than what I need. It takes approximately: 1 hour for 500k samples, the entire dataset is ~100M samples, which requires ~200hours for 100M samples.
Hardware/Software specs: RAM 8GB, Windows 11 64bit, Python 3.8.8
The problem:
I have a dataset in .csv (~13GB) where each sample has a value and a respective start-end period of few months.I want to create a dataset where each sample will have the same value but referring to each specific month.
For example:
from:
idx | start date | end date | month | year | value
0 | 20/05/2022 | 20/07/2022 | 0 | 0 | X
to:
0 | 20/05/2022 | 20/07/2022 | 5 | 2022 | X
1 | 20/05/2022 | 20/07/2022 | 6 | 2022 | X
2 | 20/05/2022 | 20/07/2022 | 7 | 2022 | X
Ideas: Manage to do it parallel (like Dask, but I am not sure how for this task).
My implementation:
Chunk read in pandas, augment in dictionaries , append to CSV. Use a function that, given a df, calculates for each sample the months from start date to end date and creates a copy sample for each month appending it to a dictionary. Then it returns the final dictionary.
The calculations are done in dictionaries as they were found to be way faster than doing it in pandas. Then I iterate through the original CSV in chunks and apply the function at each chunk appending the resulting augmented df to another csv.
The function:
def augment_to_monthly_dict(chunk):
'''
Function takes a df or subdf data and creates and returns an Augmented dataset with monthly data in
Dictionary form (for efficiency)
'''
dict={}
l=1
for i in range(len(chunk)):#iterate through every sample
# print(str(chunk.iloc[i].APO)[4:6] )
#Find the months and years period
mst =int(float((str(chunk.iloc[i].start)[4:6])))#start month
mend=int(str(chunk.iloc[i].end)[4:6]) #end month
yst =int(str(chunk.iloc[i].start)[:4] )#start year
yend=int(str(chunk.iloc[i].end)[:4] )#end year
if yend==yst:
months=[ m for m in range(mst,mend+1)]
years=[yend for i in range(len(months))]
elif yend==yst+1:# year change at same sample
months=[m for m in range(mst,13)]
years=[yst for i in range(mst,13)]
months= months+[m for m in range(1, mend+1)]
years= years+[yend for i in range(1, mend+1)]
else:
continue
#months is a list of each month in the period of the sample and years is a same
#length list of the respective years eg months=[11,12,1,2] , years=
#[2021,2022,2022,2022]
for j in range(len(months)):#iterate through list of months
#copy the original sample make it a dictionary
tmp=pd.DataFrame(chunk.iloc[i]).transpose().to_dict(orient='records')
#change the month and year values accordingly (they were 0 for initiation)
tmp[0]['month'] = months[j]
tmp[0]['year'] = years[j]
# Here could add more calcs e.g. drop irrelevant columns, change datatypes etc
#to reduce size
#
#-------------------------------------
#Append new row to the Augmented data
dict[l] = tmp[0]
l+=1
return dict
Reading the original dataset (.csv ~13GB), augment using the function and append result to new .csv:
chunk_count=0
for chunk in pd.read_csv('enc_star_logar_ek.csv', delimiter=';', chunksize=10000):
chunk.index = chunk.reset_index().index
aug_dict = augment_to_monthly_dict(chunk)#make chunk dictionary to work faster
chunk_count+=1
if chunk_count ==1: #get the column names and open csv write headers and 1st chunk
#Find the dicts keys, the column names only from the first dict(not reading all data)
for kk in aug_dict.values():
key_names = [i for i in kk.keys()]
print(key_names)
break #break after first input dict
#Open csv file and write ';' separated data
with open('dic_to_csv2.csv', 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile,delimiter=';', fieldnames=key_names)
writer.writeheader()
writer.writerows(aug_dict.values())
else: # Save the rest of the data chunks
print('added chunk: ', chunk_count)
with open('dic_to_csv2.csv', 'a', newline='') as csvfile:
writer = csv.DictWriter(csvfile,delimiter=';', fieldnames=key_names)
writer.writerows(aug_dict.values())