I have a script that uses multiprocessing to open and perform calculation on ~200k .csv files. Here's the workflow:
1) Considering a folder with ~200k .csv files. Each .csv file contains the folowing:
.csv file example:
0, 1
2, 3
4, 5
...
~500 rows
2) The script saves a list of all .csv files in a list()
3) The script divides the list with ~200k .csv files into 8 lists since I have 8 processors available.
4) The script calls do_something_with_csv()
8 times and perform calculation in parallel.
In linear mode, the execution takes around 4 min.
In parallel and in series, if I execute the script for the first time, it takes a much longer time. If I execute for the second, third etc. time, it takes around 1min. Seems like python is caching the IO operations of some sort? It looks like because I have a progress bar, and for example, if I execute until the progress bar is 5k/200k and terminate the program, the next execution will go through the first 5k runs very quickly and then slow down.
Python version: 3.6.1
Pseudo Python code:
def multiproc_dispatch():
lst_of_all_csv_files = get_list_of_files('/path_to_csv_files')
divided_lst_of_all_csv_files = split_list_chunks(lst_of_all_csv_files, 8)
manager = Manager()
shared_dict = manager.dict()
jobs = []
for lst_of_all_csv_files in divided_lst_of_all_csv_files:
p = Process(target=do_something_with_csv, args=(shared_dict, lst_of_all_csv_files))
jobs.append(p)
p.start()
# Wait for the worker to finish
for job in jobs:
job.join()
def read_csv_file(csv_file):
lst_a = []
lst_b = []
with open(csv_file, 'r') as f_read:
csv_reader = csv.reader(f_read, delimiter = ',')
for row in csv_reader:
lst_a.append(float(row[0]))
lst_b.append(float(row[1]))
return lst_a, lst_b
def do_something_with_csv(shared_dict, lst_of_all_csv_files):
temp_dict = lambda: defaultdict(self.mydict)()
for csv_file in lst_of_all_csv_files:
lst_a, lst_b = read_csv_file(csv_file)
temp_dict[csv_file] = (lst_a, lst_b)
shared_dict.update(temp_dict)
if __name__ == '__main__':
multiproc_dispatch()