4

I have a (somewhat) minimal test example of multiprocessing where the expected output is a shared Pandas dataframe. However, the shared dataframe is never updated. In my example, 10 text files are first created for testing purposes that each contain a single integer corresponding to the file name. The worker function is given each of the 10 file paths and the namespace for sharing the dataframe, and then it analyzes each file and enters "result" into the appropriate place in the dataframe (which, for testing purposes, is the sum of the integer value given in the files and each of the constants in the list called "constants").

Any ideas about getting the dataframe to update after each task, and getting variable sharing to work? Am I making a simple mistake? Several posts suggest this method of sharing a dataframe, but they generally have a simple structure, and something about my structure is making the sharing fail. For example, I am trying to follow the method given here: How to share pandas DataFrame object between processes?

from multiprocessing import Manager
import multiprocessing as mp
import pandas as pd
import os

test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]

ct = 1

for filename in test_filenames:
    with open(test_folder + '\\' + filename + '.txt', 'w') as f:
        f.write(str(ct))
    f.close()    

    ct += 1

def worker_function(file_paths, ns):

    dataframe = ns.df

    for file_path in file_paths:

        with open(file_path) as f:
            value = int(f.readline())
        f.close()

        filename = file_path.split( '\\' )[-1]    
        for constant in constants:
            result = value + constant 
            dataframe.at[constant, filename] = result

    ns.df = dataframe

def run_parallel(file_paths, number_procs, ns):    
    procs = []
    for i in range(number_procs):
        paths_load = file_paths[i::number_procs]
        proc = mp.Process(target=worker_function, args=(paths_load, ns))
        procs.append(proc)
        procs[i].start()
    for p in procs:
        p.join()

if __name__ == '__main__':        
    num_procs = 4
    files = os.listdir(test_folder)
    file_paths = [test_folder + '\\' + file for file in files]
    output_df = pd.DataFrame(columns=files, index=constants)   
    mgr = Manager()
    ns = mgr.Namespace()
    ns.df = output_df

    run_parallel(file_paths, num_procs, ns)

    output_df = ns.df

***I edited the title to reflect the solution that no longer uses namespace. I took the accepted answer and reworked it (below) to use as little code as possible and no handling of exceptions. You can import ProcessPoolExecutor instead if you want multiprocessing.

from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import os

test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]

ct = 1

for filename in test_filenames:
    with open(test_folder + '\\' + filename + '.txt', 'w') as f:
        f.write(str(ct))

    ct += 1

def worker_function(file_path):

    with open(file_path) as f:
        value = int(f.readline())

    result_list = []
    filename = file_path.split( '\\' )[-1]    
    result_list.append(filename)
    for constant in constants:
        result = value + constant
        result_list.append(result)

    return result_list

if __name__ == '__main__':

    files = os.listdir(test_folder)
    file_paths = [test_folder + '\\' + file for file in files]
    output_df = pd.DataFrame(columns=constants, index=files)

    with ThreadPoolExecutor(max_workers=4) as executor:
        pool = {executor.submit(worker_function, p): p for p in file_paths}

        for future in as_completed(pool):
            worker_result = future.result()
            output_df.loc[worker_result[0]] = worker_result[1:]
AF2k15
  • 250
  • 5
  • 19
  • I made an edit to create the "dataframe" variable from the namespace inside of the worker function. This change resulted in some of the dataframe columns to be correctly filled in, but some of them remain blank. – AF2k15 Apr 03 '19 at 19:19
  • Rerunning the code gives different filled-in columns and different blank columns each time I run it. Sometimes they are even all filled in correctly. Are the processes interfering with each other? – AF2k15 Apr 03 '19 at 19:29
  • "...when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes." [[Sharing state between processes](https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes)] Is there a reason that you cannot collect the results in the parent process and then append to the DataFrame? – ralex Apr 03 '19 at 20:51
  • There is no reason in particular. Would that entail having the worker function return a list of results? Then I update the dataframe with the list of results outside of the worker function? I'm having trouble conceptualizing this because I'm not sure how to get the output of the worker function if that's true. – AF2k15 Apr 03 '19 at 21:02

1 Answers1

2

The concurrent.futures module is helpful for workflows where you are CPU- or I/O-bound by an embarrassingly parallel data lookup or processing step.

For your case, it should look like the following. I am not on Windows so I have not tried to recreate the filenames to test it, but I hope the structure gives you a sense of the pattern. Note that I am using multiple threads rather than processes because the worker function is primarily engaged in I/O rather than processing.

from concurrent.futures import ThreadPoolExecutor, as_completed
import os
import pandas as pd

test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]

#ct = 1

def file_counter(ct=1):
    for filename in test_filenames:
        with open(test_folder + '\\' + filename + '.txt', 'w') as f:
            f.write(str(ct))
        # no need to use f.close() with a context manager
        ct += 1

def worker_function(file_path):
    result_list = []
    with open(file_path) as f:
        value = int(f.readline())
    # no need to use f.close() with a context manager
    filename = file_path.split( '\\' )[-1]    
    for constant in constants:
        result = value + constant
        result_list.append((constant, filename, result))
    return result_list


if __name__ == '__main__':
    file_counter() # keep execution below the if...main
    files = os.listdir(test_folder)
    file_paths = [test_folder + '\\' + file for file in files]
    dataframe_collection = []

    # for I/O you should prefer threads over processes
    with ThreadPoolExecutor(max_workers=4) as executor:
        pool = {executor.submit(worker_function, p): p for p in file_paths}

        for future in as_completed(pool):
            worker_result = future.result()
            if isinstance(worker_result, Exception):  # choose your own exception types
                # handle the exception
                pass
            else:
                output_df = pd.DataFrame(data=worker_result, columns=files, index=constants)
                dataframe_collection.append(output_df)

    # now concatenate all the DataFrames
    single_df = pd.concat(dataframe_collection)
ralex
  • 378
  • 1
  • 6
  • Thanks, I'm pondering this structure. In the meantime, I'm getting a new error when I run this. `AssertionError: 10 columns passed, passed data had 3 columns` referring to line 46, I believe – AF2k15 Apr 03 '19 at 22:05
  • As I mentioned, I have not tested. It should be line 26 where I structure the data being appended as `(constant, filename, result)`. Previously you were setting values directly in a DataFrame that had already been instantiated. Now, you have to pass an acceptable structure into the `DataFrame` constructor. You should be able to fix this by correctly defining `worker_function`'s return value to fit the DataFrame. – ralex Apr 03 '19 at 22:10
  • 1
    It works once you get the results in the right place in the dataframe and the dimensions are handled correctly. Thanks! – AF2k15 Apr 03 '19 at 22:40