I have a function that takes a dataframe as an input and loops through each row to perform a bunch of operations and writes the final output to a file in append mode. I want to parallelize the execution of this function such that instead of iterating through all the rows of the dataframe one by one, it will loop over slices or chunks of my original dataframe in parallel. To help with creating slices, col1 of the dataframe includes numbers to indicate slices of rows. I am using this column to create array of dataframes to pass to executor.submit
and also in my function to create slice specific output file names. That way each process writes to file associated with a specific slice without contention.
The problem is that I keep running into this error.
AttributeError: Can't get attribute 'my_df_func' on <module '__main__' (built-in)
Here's what a simplified version of my code looks like.
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
import os
def my_df_func(my_df_slice):
for col1, col2, col3, col4 in zip(my_df_slice.col1, my_df_slice.col2, my_df_slice.col3, my_df_slice.col4):
file_df = 'info_'+str(col2)+'&id='+col3+'&abc='+col4
output_file = os.path.join(output_dir, f'out_{col1}.csv')
file_df.to_csv(output_file, mode = 'a', header=False if os.path.isfile(output_file) else True, index = False)
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
my_df_array = [x for __, x in my_df.groupby('col1')]
[executor.submit(my_df_func, my_df_slice) for my_df_slice in my_df_array]
Oddly, this works but it executes the function for each slice sequentially.
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
my_df_array = [x for __, x in my_df.groupby('col1')]
[executor.submit(my_df_func(my_df_slice)) for my_df_slice in my_df_array]
Any idea what might be going on?