I have a (somewhat) minimal test example of multiprocessing where the expected output is a shared Pandas dataframe. However, the shared dataframe is never updated. In my example, 10 text files are first created for testing purposes that each contain a single integer corresponding to the file name. The worker function is given each of the 10 file paths and the namespace for sharing the dataframe, and then it analyzes each file and enters "result" into the appropriate place in the dataframe (which, for testing purposes, is the sum of the integer value given in the files and each of the constants in the list called "constants").
Any ideas about getting the dataframe to update after each task, and getting variable sharing to work? Am I making a simple mistake? Several posts suggest this method of sharing a dataframe, but they generally have a simple structure, and something about my structure is making the sharing fail. For example, I am trying to follow the method given here: How to share pandas DataFrame object between processes?
from multiprocessing import Manager
import multiprocessing as mp
import pandas as pd
import os
test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]
ct = 1
for filename in test_filenames:
with open(test_folder + '\\' + filename + '.txt', 'w') as f:
f.write(str(ct))
f.close()
ct += 1
def worker_function(file_paths, ns):
dataframe = ns.df
for file_path in file_paths:
with open(file_path) as f:
value = int(f.readline())
f.close()
filename = file_path.split( '\\' )[-1]
for constant in constants:
result = value + constant
dataframe.at[constant, filename] = result
ns.df = dataframe
def run_parallel(file_paths, number_procs, ns):
procs = []
for i in range(number_procs):
paths_load = file_paths[i::number_procs]
proc = mp.Process(target=worker_function, args=(paths_load, ns))
procs.append(proc)
procs[i].start()
for p in procs:
p.join()
if __name__ == '__main__':
num_procs = 4
files = os.listdir(test_folder)
file_paths = [test_folder + '\\' + file for file in files]
output_df = pd.DataFrame(columns=files, index=constants)
mgr = Manager()
ns = mgr.Namespace()
ns.df = output_df
run_parallel(file_paths, num_procs, ns)
output_df = ns.df
***I edited the title to reflect the solution that no longer uses namespace. I took the accepted answer and reworked it (below) to use as little code as possible and no handling of exceptions. You can import ProcessPoolExecutor instead if you want multiprocessing.
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import os
test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]
ct = 1
for filename in test_filenames:
with open(test_folder + '\\' + filename + '.txt', 'w') as f:
f.write(str(ct))
ct += 1
def worker_function(file_path):
with open(file_path) as f:
value = int(f.readline())
result_list = []
filename = file_path.split( '\\' )[-1]
result_list.append(filename)
for constant in constants:
result = value + constant
result_list.append(result)
return result_list
if __name__ == '__main__':
files = os.listdir(test_folder)
file_paths = [test_folder + '\\' + file for file in files]
output_df = pd.DataFrame(columns=constants, index=files)
with ThreadPoolExecutor(max_workers=4) as executor:
pool = {executor.submit(worker_function, p): p for p in file_paths}
for future in as_completed(pool):
worker_result = future.result()
output_df.loc[worker_result[0]] = worker_result[1:]