I'm following the answer from this question: pandas multiprocessing apply
Usually when I run a function on rows in pandas, I do something like this
dataframe.apply(lambda row: process(row.attr1, row.attr2, ...))
...
def process(attr1, attr2, ...):
...
But I want to run this function multithreaded. So I implemented the parallelize_on_rows from the above posted question. However, the aforementioned solution works because the function passed in doesn't take parameters. For functions with parameters I tried to use partials. However, I can't figure out how to create a partial that takes arguments from the row that require the lambda function to access.
Here is my code
def parallelize_function_on_df(self, data, func, num_of_processes=5):
# refers to the data being split across array sections
data_split = np.array_split(data, num_of_processes)
# map a specific function to the array sections
pool = Pool(num_of_processes)
data = pd.concat(pool.map(func, data_split))
# must call close before join, research why
pool.close()
pool.join()
return data
def run_on_df_subset(self, func, data_subset):
return data_subset.apply(func, axis=1)
def parallelize_on_rows(self, data, func, num_of_processes=5):
return self.parallelize_function_on_df(data, partial(self.run_on_df_subset, func), num_of_processes)
def mass_download(some_sql):
download_table_df = pd.read_sql(some_sql, con=MYSQL.CONNECTION)
processed_data = {}
custom_option = True
process_row_partial = partial(self.process_candidate_row_parallel, processed_data, custom_option)
parallelize_on_rows(download_table_df, process_row_partial)
def process_candidate_row_parallel(row, processed_data, custom_option=False):
if row['some_attr'] in processed_data.keys() and processed_data[row['some_attr']] == 'download_successful' and custom_option:
do_some_other_processing()
download_single_file(row['some_attr1'], row['some_attr2'], processed_data)
So this doesn't work, because like I said, the row[attributes] aren't actually being passed to the thread since my partial just has the function with no arguments. How can I achieve otherwise?