1

I have a y.csv file. The file size is 10 MB and it contains data from Jan 2020 to May 2020.

I also have a separate file for each month. e.g. data-2020-01.csv. It contains detailed data. The file size of each month file is around 1 GB.

I'm splitting the y.csv by month and then process the data by loading the relevant month file. This process is taking too long when I go for large number of months. e.g. 24 months.

I would like to process the data faster. I have access to AWS m6i.8xlarge instance which has 32 vCPU and 128 GB memory.

I'm new to multiprocessing. So can someone guide me here?

This is my current code.

import pandas as pd

periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]

y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB


def process(_month_df, _index):
    idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
    for _, value in _month_df.loc[idx:].itertuples():

        up_delta = 200
        down_delta = 200

        up_value = value + up_delta
        down_value = value - down_delta

        if value > up_value:
            y.loc[_index, "result"] = 1
            return

        if value < down_value:
            y.loc[_index, "result"] = 0
            return


for x in periods:
    filename = "data-" + str(x[0]) + "-" + str(x[1]).zfill(2)  # data-2020-01
    filtered_y = y[(y.index.month == x[1]) & (y.index.year == x[0])]  # Only get the current month records
    month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True)  # Filesize: ~1 GB (data-2020-01.csv)

    for index, row in filtered_y.iterrows():
        process(month_df, index)
John
  • 129
  • 12
  • Interested in the same topic, sadly cannot advise as not yet experiences in multiprocesses also. just one observation, the last block with `.iterrows():` is slowing your process drastically. https://stackoverflow.com/a/65356169/8805842 investigate that part also – NoobVB May 22 '22 at 08:42
  • The problem here is that you can't really share the dataframe (referenced by *y*) across multiple processes. It could be shared across multiple threads but that's a moot point for two reasons 1) This is CPU bound so multithreading isn't appropriate 2) pandas dataframes are not thread safe – DarkKnight May 22 '22 at 08:46
  • @NoobVB Since my `filtered_y` is small in size, it is not the bottleneck here. But since I'm only interested in index here, I'll switch it `itertuples`. Thanks for pointing it out. – John May 22 '22 at 08:46
  • @LancelotduLac I can optimise the code to not share y. My y has unique index. – John May 22 '22 at 08:49
  • @John just keep in mind, 10Mb does not really matters, for .iterrows() or itertuples(), the amount of rows is the main issue, so just check the shape of your filtered_y for curiousity. And of course, please update this thread with your multiP solution, - curious :) – NoobVB May 22 '22 at 08:58
  • @NoobVB Each month `filtered_y` has 1440 rows. – John May 22 '22 at 09:06
  • @NoobVB But my `month_df` has ~100 million rows. So even itertuples is slow. – John May 22 '22 at 09:07

2 Answers2

1

As commented in multiple pandas/threading questions, CSV files being IO bound, you can get some benefit from using a ThreadPoolExecutor.

At the same time, if you are going to perform aggregating operations, consider performing the read_csv also inside of your processor and use ProcessPoolExecutor instead.

If you are going to pass a lot of data between your multiprocesses you will also need a proper memory sharing method.

However I see the use of iterrows and itertuples In general those two instructions make my eyes bleed. Are you sure you cannot process the data in a vectorised mode?

This particular section I am not sure what it is supposed to do, and having M rows will make it very slow.

def process(_month_df, _index):
    idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
    for _, value in _month_df.loc[idx:].itertuples():

        up_delta = 200
        down_delta = 200

        up_value = value + up_delta
        down_value = value - down_delta

        if value > up_value:
            y.loc[_index, "result"] = 1
            return

        if value < down_value:
            y.loc[_index, "result"] = 0
            return

Below a vectorized code to find if it is going up or down, and in what row

df=pd.DataFrame({'vals': np.random.random(int(10))*1000+5000}).astype('int64')
print(df.vals.values)

up_value = 6000
down_value = 3000
valsup = df.vals.values + 200*np.arange(df.shape[0])+200
valsdown = df.vals.values - 200*np.arange(df.shape[0])-200

#! argmax returns 0 if all false
# idx_up = np.argmax(valsup > up_value)
# idx_dwn= np.argmax(valsdown < down_value)

idx_up = np.argwhere(valsup > up_value)
idx_dwn= np.argwhere(valsdown < down_value)
idx_up = idx_up[0][0] if len(idx_up) else -1
idx_dwn = idx_dwn[0][0] if len(idx_dwn) else -1


if idx_up < 0 and idx_dwn<0:
    print(f" Not up nor down")
if idx_up < idx_dwn or idx_dwn<0:
    print(f" Result is positive, in position {idx_up}")
else: 
    print(f" Result is negative, in position {idx_dwn}")

For the sake of completeness, benchmarking itertuples() and the argwhere approach for 1000 elements:

  • .itertuples(): 757µs
  • arange + argwhere: 60µs
Zaero Divide
  • 699
  • 2
  • 10
  • I definitely prefer vectorised mode. However, I believe it is not possible in my use case since I'm checking whether up_value or down_value hits first. So the order is important. – John May 22 '22 at 13:35
  • what about using a `cumsum` and getting the first index? If you provide some sample data we can also test – Zaero Divide May 22 '22 at 13:36
  • For that I should be able to pd.cut my data in an exact order from the value. I believe that is not possible at the moment in pandas. If you have any ideas do let me know. – John May 22 '22 at 13:40
  • See here. https://github.com/pandas-dev/pandas/issues/5494 and here https://github.com/pandas-dev/pandas/issues/4059 – John May 22 '22 at 13:42
  • Hey, would this solution only works for fixed value like `200`?. I mean can I use this solution for dynamic size? e.g. up_value is `+1%` from current_value and down_value is `-1%` from the current_value. – John May 22 '22 at 16:00
  • 1
    Yes, it is very fine, the question was about MP. **My point is that more often than not the code is parallelized without being optimized** – Zaero Divide May 23 '22 at 07:57
  • Hi, I'm still struggling. I'm trying to make your vectorized solution work since other solutions are very slow when the data size becomes huge. You have used a fixed value 200 here. Could you tell me how can I use percentage based value for up_delta (1%) and down_delta (0.5%) ? I mean `valsup = df.vals.values + (1%)*np.arange(df.shape[0])+(1%)` and `valsdown = df.vals.values - (0.5%)*np.arange(df.shape[0])-(0.5%)` – John Jun 17 '22 at 15:21
  • I have asked a follow-up question here. Please help me if you can. Thanks. https://stackoverflow.com/questions/72663020/numpy-vectorized-calculation-of-a-large-csv-file – John Jun 17 '22 at 18:15
1

A multithreading pool would be ideal for sharing the y dataframe among threads (obviating the need for using shared memory) but is not so good at running the more CPU-intensive processing in parallel. A multiprocessing pool is great for doing CPU-intensive processing but not so great in sharing data across processes without coming up with a shred memory representation of your y dataframe.

Here I have rearranged your code so that I use a multithreading pool to create filtered_y for each period (which is a CPU-intensive operation, but pandas does release the Global Interpreter Lock for certain operations -- hopefully this one). Then we are only passing one-months worth of data to a multiprocessing pool, rather than the entire y dataframe, to process that month with worker function process_month. But since each pool process does not have access to the y dataframe, it just returns the indices that need to be updated with the values to be replaced.

import pandas as pd
from multiprocessing.pool import Pool, ThreadPool, cpu_count

def process_month(period, filtered_y):
    """
    returns a list of tuples consisting of (index, value) pairs
    """
    filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2)  # data-2020-01
    month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True)  # Filesize: ~1 GB (data-2020-01.csv)
    results = []
    for index, row in filtered_y.iterrows():   
        idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
        for _, value in month_df.loc[idx:].itertuples():
    
            up_delta = 200
            down_delta = 200
    
            up_value = value + up_delta
            down_value = value - down_delta
    
            if value > up_value:
                results.append((index, 1))
                break
    
            if value < down_value:
                results.append((index, 0))
                break
    return results

def process(period):
    filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])]  # Only get the current month records
    for index, value in multiprocessing_pool.apply(process_month, (period, filtered_y)):
        y.loc[index, "result"] = value

def main():
    global y, multiprocessing_pool

    periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB

    MAX_THREAD_POOL_SIZE = 100
    thread_pool_size = min(MAX_THREAD_POOL_SIZE, len(periods))
    multiprocessing_pool_size = min(thread_pool_size, cpu_count())
    with Pool(multiprocessing_pool_size) as multiprocessing_pool, \
    ThreadPool(thread_pool_size) as thread_pool:
        thread_pool.map(process, periods)
        
    # Presumably y gets written out again as a CSV file here?

# Required for Windows:
if __name__ == '__main__':
    main()

Version Using Just a Single Multiprocessing Pool

import pandas as pd
from multiprocessing.pool import Pool, ThreadPool, cpu_count

def process_month(period):
    """
    returns a list of tuples consisting of (index, value) pairs
    """
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB
    filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])]  # Only get the current month records
    filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2)  # data-2020-01
    month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True)  # Filesize: ~1 GB (data-2020-01.csv)
    results = []
    for index, row in filtered_y.iterrows():   
        idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
        for _, value in month_df.loc[idx:].itertuples():
    
            up_delta = 200
            down_delta = 200
    
            up_value = value + up_delta
            down_value = value - down_delta
    
            if value > up_value:
                results.append((index, 1))
                break
    
            if value < down_value:
                results.append((index, 0))
                break
    return results

def main():
    periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]

    multiprocessing_pool_size = min(len(periods), cpu_count())
    with Pool(multiprocessing_pool_size) as multiprocessing_pool:
        results_list = multiprocessing_pool.map(process_month, periods)
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB
    for results in results_list:
        for index, value in results:
            y.loc[index, "result"] = value
    # Write out new csv file:
    ...

# Required for Windows:
if __name__ == '__main__':
    main()

And now for a variation of this that uses a bit more memory but allows the main process to overlap its processing with the multiprocessing pool. This could be beneficial if the number of indices needing to be updated is quite large:

...
def main():
    periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]

    multiprocessing_pool_size = min(len(periods), cpu_count() - 1) # save a core for the main process
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB
    with Pool(multiprocessing_pool_size) as multiprocessing_pool:
        # Process values as soon as they are returned:
        for results in multiprocessing_pool.imap_unordered(process_month, periods):
            for index, value in results:
                y.loc[index, "result"] = value
    # Write out new csv file:
    ...

This last version could be superior since it first reads the csv file before submitting tasks to the pool and depending on the platform and how it caches I/O operations it could result in the worker function not having to do any physical I/O to read in its copies of the file. But that is one more 10M file that has been read into memory.

Booboo
  • 38,656
  • 3
  • 37
  • 60
  • In `main()` function, I don't see the `results` variable. How do I access that variable? – John May 22 '22 at 16:08
  • The `results` variable is only returned to the `process` worker function who uses the `(index, value)` tuples to update `y`, which is what you ultimately want to do. Why would `main` need this list of tuples? – Booboo May 22 '22 at 16:25
  • Ok I understood now. So when this line get executed `y.loc[index, "result"] = value`, it is outside the process? I read somewhere that it is not possible to access global variable inside the process. – John May 22 '22 at 16:52
  • The code `y.loc[index, "result"] = value` is being executed by a worker function `process` running in a multithreading pool, which runs in the same process as the main process where `y` is defined as global. Worker function `process_month` is running in the multiprocessing pool (separate processes) and generates these tuples using the passed filtered month and because `y` is not visible to it, must return a list of what needs to be updated. Is that clear? Have you actually run this because I don't have the data and therefore I wasn't able to – Booboo May 22 '22 at 17:00
  • any luck testing? curious how it went with these `.itertuples` and multiP – NoobVB May 22 '22 at 20:03
  • @Booboo Just tested the code. The code seem like it starts the threads/processes correctly. But it doesn't look like its finishing the task. It's got stuck for more than an hour. Each month has only 1440 rows. So for 7200 rows, I don't think this is normal. Thanks – John May 23 '22 at 06:13
  • @Booboo Apologies. It was my mistake. I had an improper indent. So the code loops through the entire dataframe. Just fixed it. – John May 23 '22 at 06:22
  • @Booboo I'm getting this error. `TypeError: 'list' object is not callable in python`. In this line `multiprocessing_pool.apply` – John May 23 '22 at 06:45
  • @Booboo I had to use `multiprocessing_pool.apply_async(process_month, (period, filtered_y)).get()` to fix the error. Hope, I'm doing this correctly. I have a question. I don't see any `p.close()` and `p.join()` in your code. Is this normal? – John May 23 '22 at 07:13
  • **If little data is passed back, and little data is passed to, `processPool` may be better than `threadPool`** as they are truly parallel – Zaero Divide May 23 '22 at 08:00
  • @ZaeroDivide I believe few MBs of data transferred via `process_month(period, filtered_y)` and the `results` returned has only an additional binary value. is 10 MB considered as `little data`? – John May 23 '22 at 09:52
  • @John Egads! I omitted an all-important comma! That should have been `multiprocessing_pool.apply(process_month**,** (period, filtered_y)):` The use of method `apply` instead of `apply_async` is sufficient because each thread is only submitting a single task to the multiprocessing pool and wants to block until the result is returned. The parallelism in processing occurs because *each thread* is submitting a task to the common multiprocessing pool. I have updated the line of code above. – Booboo May 23 '22 at 10:20
  • You *must* have the main process (currently using multiple threads) submitting the CPU-intensive tasks to a multiprocessing pool and having those tasks return the values to be updated back to the main process. That is the only way that the same copy of `y` is gets updated. You could do away with the threading pool and have just a multiprocessing pool that returns their results back to the main thread. But this would require passing the entire `y` dataframe to each process instead of just a month's worth as is now being done.. The threading pool reduces inter-process data transfer and memory. – Booboo May 23 '22 at 10:37
  • @John According to the docs on `Pool`: ***Warning** `multiprocessing.pool objects` have internal resources that need to be properly managed (like any other resource) by using the pool as a context manager or by calling `close()` and `terminate()` manually. Failure to do this can lead to the process hanging on finalization.* **I am using context managers.** – Booboo May 23 '22 at 11:04
  • @Booboo I have a question with regards to the Pool size. Hope you clarify that. `with Pool() as multiprocessing_pool, ThreadPool(len(periods)) as thread_pool`. In that line, you are using an empty `Pool()`. Is it a good idea to use `Pool(multiprocessing.cpu_count())`? One another thing is, you are using `len(periods)` for ThreadPool. I believe, it would be okay when the length is small. Would this solution would work when the length is huge? e.g. 10000. Is there a way to limit that? Thanks – John Jun 11 '22 at 17:50
  • 1
    Read the documentation on `multiprocessing.pool.Pool`, which states: *processes* is the number of worker processes to use. If *processes* is `None` then the number returned by `os.cpu_count()` is used. Python can handle *a lot* of threads depending on the tasks being submitted. A given platform may pose a limit on the number of threads, but that limit is usually in the thousands. To know what is optimal, you need to test with different pool sizes. To limit the pool size to some maximum value of your choosing, e.g. 100, use `min(100, len(periods))`. – Booboo Jun 11 '22 at 18:22
  • 1
    I could see that `len(periods)` would be a fairly small number in the code you posted and it seemed a reasonable initial pool size to try. I could have also specified: `with Pool(min(thread_pool_size, os.cpu_count()) ...` where `thread_pool_size` is the size of the thread pool being used since there is no point in creating more processes than the number of threads concurrently bein run or the number of cores you have. I will update the answer to make all of this explicit. – Booboo Jun 11 '22 at 18:33
  • @Booboo Would it be better if we don't go for `ThreadPool`?. We are using the `ThreadPool` to share `filtered_y`. Since my `y.csv` is a small file, I think we can do `pd.read_csv` inside `process_month` function and filter there. Instead of returning the `results`, we can just store it in a file like `results.to_csv(f"{filename}.csv", index=True, header=True)`. I can merge the csv file later using `cat` command. If that was the case, would this be enough? `with Pool() as multiprocessing_pool: \ multiprocessing_pool.map(process_month, periods)` – John Jun 12 '22 at 05:16
  • You said `y.csv` is about 10M. That's not what I would call *small* when you consider you would have 5 processes with copies of this csv file in memory concurrently. Do your have sufficient memory for that? If so, then you could try it. I am going to update the question with another answer along these lines that will not require doing a `cat` operation. – Booboo Jun 12 '22 at 11:02
  • Something to consider regarding all versions: Each task submitted to the pool results in reading in monthly files in parallel. Depending on the type of disk you have (solid state?), this could be inefficient if it results in a lot of head movement making each file input slower than it would be if the files were read in sequentially. And the CPU processing required for each monthly file has to be sufficiently large to overcome the additional overhead incurred by multiprocessing. In other words, does any of these versions outperform plain sequential processing? – Booboo Jun 12 '22 at 11:24