0

I have a function that takes a dataframe as an input and loops through each row to perform a bunch of operations and writes the final output to a file in append mode. I want to parallelize the execution of this function such that instead of iterating through all the rows of the dataframe one by one, it will loop over slices or chunks of my original dataframe in parallel. To help with creating slices, col1 of the dataframe includes numbers to indicate slices of rows. I am using this column to create array of dataframes to pass to executor.submit and also in my function to create slice specific output file names. That way each process writes to file associated with a specific slice without contention.

The problem is that I keep running into this error.

AttributeError: Can't get attribute 'my_df_func' on <module '__main__' (built-in)

Here's what a simplified version of my code looks like.

from concurrent.futures import ProcessPoolExecutor
import pandas as pd
import os

def my_df_func(my_df_slice):
    for col1, col2, col3, col4 in zip(my_df_slice.col1, my_df_slice.col2, my_df_slice.col3, my_df_slice.col4):
        file_df = 'info_'+str(col2)+'&id='+col3+'&abc='+col4
        output_file = os.path.join(output_dir, f'out_{col1}.csv')
        file_df.to_csv(output_file, mode = 'a', header=False if os.path.isfile(output_file) else True, index = False)


if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
         my_df_array = [x for __, x in  my_df.groupby('col1')]
         [executor.submit(my_df_func, my_df_slice) for my_df_slice in my_df_array]

Oddly, this works but it executes the function for each slice sequentially.

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
         my_df_array = [x for __, x in  my_df.groupby('col1')]
         [executor.submit(my_df_func(my_df_slice)) for my_df_slice in my_df_array]

Any idea what might be going on?

Radagast
  • 5,102
  • 3
  • 12
  • 27
  • I hope this post answers your requirements: https://stackoverflow.com/questions/45545110/make-pandas-dataframe-apply-use-all-cores – bk7159 Sep 01 '23 at 20:39

1 Answers1

0

The reason the second example works is because executor.submit(my_df_func(my_df_slice)) runs my_df_func(my_df_slice) in the main thread, then passes the return value (in this case, None) into executor.submit

As for why you get the error in the first block, it seems to be an interpreter specific bug, which can be fixed by moving def my_df_func() into its own file, then importing it. Multiprocessing example giving AttributeError