A multithreading pool would be ideal for sharing the y
dataframe among threads (obviating the need for using shared memory) but is not so good at running the more CPU-intensive processing in parallel. A multiprocessing pool is great for doing CPU-intensive processing but not so great in sharing data across processes without coming up with a shred memory representation of your y
dataframe.
Here I have rearranged your code so that I use a multithreading pool to create filtered_y
for each period (which is a CPU-intensive operation, but pandas does release the Global Interpreter Lock for certain operations -- hopefully this one). Then we are only passing one-months worth of data to a multiprocessing pool, rather than the entire y
dataframe, to process that month with worker function process_month
. But since each pool process does not have access to the y
dataframe, it just returns the indices that need to be updated with the values to be replaced.
import pandas as pd
from multiprocessing.pool import Pool, ThreadPool, cpu_count
def process_month(period, filtered_y):
"""
returns a list of tuples consisting of (index, value) pairs
"""
filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2) # data-2020-01
month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True) # Filesize: ~1 GB (data-2020-01.csv)
results = []
for index, row in filtered_y.iterrows():
idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
for _, value in month_df.loc[idx:].itertuples():
up_delta = 200
down_delta = 200
up_value = value + up_delta
down_value = value - down_delta
if value > up_value:
results.append((index, 1))
break
if value < down_value:
results.append((index, 0))
break
return results
def process(period):
filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])] # Only get the current month records
for index, value in multiprocessing_pool.apply(process_month, (period, filtered_y)):
y.loc[index, "result"] = value
def main():
global y, multiprocessing_pool
periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
MAX_THREAD_POOL_SIZE = 100
thread_pool_size = min(MAX_THREAD_POOL_SIZE, len(periods))
multiprocessing_pool_size = min(thread_pool_size, cpu_count())
with Pool(multiprocessing_pool_size) as multiprocessing_pool, \
ThreadPool(thread_pool_size) as thread_pool:
thread_pool.map(process, periods)
# Presumably y gets written out again as a CSV file here?
# Required for Windows:
if __name__ == '__main__':
main()
Version Using Just a Single Multiprocessing Pool
import pandas as pd
from multiprocessing.pool import Pool, ThreadPool, cpu_count
def process_month(period):
"""
returns a list of tuples consisting of (index, value) pairs
"""
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])] # Only get the current month records
filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2) # data-2020-01
month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True) # Filesize: ~1 GB (data-2020-01.csv)
results = []
for index, row in filtered_y.iterrows():
idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
for _, value in month_df.loc[idx:].itertuples():
up_delta = 200
down_delta = 200
up_value = value + up_delta
down_value = value - down_delta
if value > up_value:
results.append((index, 1))
break
if value < down_value:
results.append((index, 0))
break
return results
def main():
periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
multiprocessing_pool_size = min(len(periods), cpu_count())
with Pool(multiprocessing_pool_size) as multiprocessing_pool:
results_list = multiprocessing_pool.map(process_month, periods)
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
for results in results_list:
for index, value in results:
y.loc[index, "result"] = value
# Write out new csv file:
...
# Required for Windows:
if __name__ == '__main__':
main()
And now for a variation of this that uses a bit more memory but allows the main process to overlap its processing with the multiprocessing pool. This could be beneficial if the number of indices needing to be updated is quite large:
...
def main():
periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
multiprocessing_pool_size = min(len(periods), cpu_count() - 1) # save a core for the main process
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
with Pool(multiprocessing_pool_size) as multiprocessing_pool:
# Process values as soon as they are returned:
for results in multiprocessing_pool.imap_unordered(process_month, periods):
for index, value in results:
y.loc[index, "result"] = value
# Write out new csv file:
...
This last version could be superior since it first reads the csv file before submitting tasks to the pool and depending on the platform and how it caches I/O operations it could result in the worker function not having to do any physical I/O to read in its copies of the file. But that is one more 10M file that has been read into memory.