4

I have a process that requires each row of a dataframe processed and then a new value appended to each row. It's a large dataframe and taking hours to process one dataframe at a time.

If I have a iterrow loop that sends each row to a function, can I parallize my processing for a speedup? The results of the row are not related

basically my code something like this

for index, row in df.iterrows():
   row['data'] = function[row]

Is there a easy way to do this to speed up processing?

Vikash Balasubramanian
  • 2,921
  • 3
  • 33
  • 74
Lostsoul
  • 25,013
  • 48
  • 144
  • 239
  • Probably take a look at Dask. – Trenton McKinney Oct 14 '20 at 22:37
  • 1
    In my experience, if you are looping through a DataFrame, you're probably not doing it the Pandas way. – Jarad Oct 14 '20 at 22:50
  • As @Jarad said you probably don't want to iterate. See if vectorized alternatives exist (not apply, depending on what `function` does there may be better options) -- see [this answer by me](https://stackoverflow.com/a/55557758/4909087) and also this one on [when (not) to use apply](https://stackoverflow.com/questions/54432583/when-should-i-ever-want-to-use-pandas-apply-in-my-code). – cs95 Oct 15 '20 at 00:04

2 Answers2

1

While iterating over rows isnt good practice and there can be alternate logics with grouby/transform aggregations etc, but if in worst case you really need to do so, follow the answer. Also, you might not need to reimplement everything here and you can use libraries like Dask, which is built on top of pandas.

But just to give Idea, you can use multiprocessing (Pool.map) in combination with chunking. read csv in chunk(or make chucks as mentioned in the end of answer) and map it to the pools, in processing each chunk add new rows (or add them to list and make new chunk) and return it from the function.

In the end combine the dataframes when all pools are executed.

import pandas as pd
import numpy as np
import multiprocessing


def process_chunk(df_chunk):
        
        for index, row in df_chunk.reset_index(drop = True).iterrows():
                    #your logic for updating this chunk or making new chunk here
                         
                    print(row)
                    
                    print("index is " + str(index))
        #if you can added to same df_chunk, return it, else if you appended
        #rows to have list_of_rows, make a new df with them and return
        #pd.Dataframe(list_of_rows)  

        return df_chunk   


if __name__ == '__main__':
            #use all available cores , otherwise specify the number you want as an argument,
            #for example if you have 12 cores,  leave 1 or 2 for other things
            pool = multiprocessing.Pool(processes=10) 
            
            results = pool.map(process_chunk, [c for c in pd.read_csv("your_csv.csv", chunksize=7150)])
            pool.close()
            pool.join()
            
            #make new df by concatenating
            
            concatdf = pd.concat(results, axis=0, ignore_index=True)
            

Note: Instead of reading csv you can pass chucks by the same logic, to calculate chunk-size you might want something like round_of( (length of df) / (number of core available-2)) eg 100000/14 = round(7142.85) = 7150 rows per chunk

 results = pool.map(process_chunk,
        [df[c:c+chunk_size] for c in range(0,len(df),chunk_size])
A.B
  • 20,110
  • 3
  • 37
  • 71
1

Instead of using df.iterrows() why not just use a vectorized method like apply()?

df.apply(function, axis=1)

.apply() is a Pandas way to perform iterations on columns/rows. It takes advantage of vectorized techniques and speeds up execution of simple and complex operations by many times.

Check this Reference article to see how it differs.

Other options are looking at Dask, Vaex or just good old fashion Multiprocessing.

Akshay Sehgal
  • 18,741
  • 3
  • 21
  • 51