0

I have a function that takes dataframe as an input and returns a dataframe. Like:

def process(df):
    <all the code for processing>
    return df
# input df has 250K rows and 30 columns
# saving it in a variable
result = process(df)
# transform input df into 10,000K rows and over 50 columns

It does a lot of processing and thus takes a long time to return the output. I am using jupyter notebook.

I have come up with a new function that filters the original dataframe into 5 chunks not of equal size but between 30K to 100K, based on some category filter on a column on the origianl df and have it passed separately as process(df1), process(df2)...etc. and save it as result1, result 2, etc and then merge the results together as one single final dataframe.

But I want them to run simultaneously and combine the results automatically. Like a code to run the 5 process functions together and once all are completed then they can join as one to give me the same "result" as earlier but with a lot of run time saved.

Even better if I can split the original dataframe into equal parts and run simultaneously each part using the process(df) function, like it splits randomly those 250 k rows into 5 dataframes of size 50k each and send them as an input to the process(df) five times and runs them parallelly and give me the same final output I would be getting right now without any of this optimization.

I was reading a lot about multi-threading and I found some useful answers on stack overflow but I wasn't able to really get it work. I am very new to this concept of multi-threading.

trojan horse
  • 347
  • 2
  • 10

2 Answers2

1

You can use the multiprocessing library for this, which allows you to run a function on different cores of the CPU.

The following is an example

from multiprocessing import Pool

def f(df):
    # Process dataframe
    return df

if __name__ == '__main__':
    dataframes = [df1, df2, df3]

    with Pool(len(dataframes)) as p:
        proccessed_dfs = p.map(f, dataframes)
    
    print(processed_dfs)

    # You would join them here
  • Hi Leon, thanks for this. I understand the code but it takes forever to run. It seems to slow it further. Like I tired it for handful of cases and it ran for way too long and is still running. – trojan horse Jul 15 '21 at 19:40
  • That's really odd. To troubleshoot this I have some questions: When you say "way too long" do you mean 2 seconds or 2 hours (it usually takes a sec to create all the processes)? Into how many parts did you break your dataframe? How many cores does your CPU have? Are you including the time it takes to split and merge the dataframe when timing? –  Jul 16 '21 at 04:28
  • So I tested it with a small chunk of my otherwise large dataframe as I mentioned in the question. Just to test out I use 100 records and the serial method does it quickly in less that a minute whereas the pool went on to run for 20 mins and I had to force stop it. It doesnt throw an error. I am not counting the time to split it, I had the split ready. I divided the dataframes in 5 categories and had records for each so close to 20 records. And, I have 6 CPU cores. – trojan horse Jul 16 '21 at 04:51
  • I found this article on stack overflow - https://stackoverflow.com/questions/20727375/multiprocessing-pool-slower-than-just-using-ordinary-functions (check the aelfinn's answer). I was wondering if that's the reason as I am passing heavy dataframes which will further expand into more rows and columns – trojan horse Jul 16 '21 at 04:52
  • 1
    It seems to be a problem with your processing function. When multiprocessing takes longer it is only by a couple of seconds, not by 19 minutes. I think you are not exiting the processes correctly in your data processing function which is causing it to hang. An easy way to test this is to place some print statements at different stages of the function to see where it hangs (e.g. one when the process starts, another right before it starts processing the data, another after it finishes processing the data, and the last before the function finishes). –  Jul 16 '21 at 17:13
0

You should check dask (https://dask.org/) since it seems like you have mostly operations on dataframes. A big advantage is that you won't have to worry about all the details of manually splitting your dataframe and all of that.

A.Gharbi
  • 709
  • 6
  • 20
  • Hi, I have already tried dask and it is good and create chunks but the issue is the data has to be ingested into a ETL pipeline and the system doesn't recognizes dask data frames for now. I want a more conventional software engineering approach here. – trojan horse Jul 15 '21 at 18:59
  • 1
    I see...I mean you can still get output that is not a dask type when you use .compute() (like pandas dataframes) but you probably have other constraints in your system that I am not aware of. – A.Gharbi Jul 15 '21 at 19:45