45

I would like to parallelize the following code:

for row in df.iterrows():
    idx = row[0]
    k = row[1]['Chromosome']
    start,end = row[1]['Bin'].split('-')

    sequence = sequence_from_coordinates(k,1,start,end) #slow download form http

    df.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
    df.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
    df.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))

I have tried to use multiprocessing.Pool() since each row can be processed independently, but I can't figure out how to share the DataFrame. I am also not sure that this is the best approach to do parallelization with pandas. Any help?

alec_djinn
  • 10,104
  • 8
  • 46
  • 71
  • 3
    Your row-wise iteration is slow by default. You can either try to find a way to vectorize your operations and do it without iteration, or you split up your dataframe into a few large chunks and iterate over each chunk parallelly. – Khris Nov 03 '16 at 12:06
  • Sure, that's a way to do it. But I am still looking for a better way, if it exist. – alec_djinn Nov 04 '16 at 10:20
  • 2
    Have you considered using dask? It would do most of the parallelization for you – Zeugma Nov 07 '16 at 02:03
  • I don't know Dask, I will have a look into it. – alec_djinn Nov 07 '16 at 12:51

4 Answers4

74

As @Khris said in his comment, you should split up your dataframe into a few large chunks and iterate over each chunk in parallel. You could arbitrarily split the dataframe into randomly sized chunks, but it makes more sense to divide the dataframe into equally sized chunks based on the number of processes you plan on using. Luckily someone else has already figured out how to do that part for us:

# don't forget to import
import pandas as pd
import multiprocessing

# create as many processes as there are CPUs on your machine
num_processes = multiprocessing.cpu_count()

# calculate the chunk size as an integer
chunk_size = int(df.shape[0]/num_processes)

# this solution was reworked from the above link.
# will work even if the length of the dataframe is not evenly divisible by num_processes
chunks = [df.iloc[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]

This creates a list that contains our dataframe in chunks. Now we need to pass it into our pool along with a function that will manipulate the data.

def func(d):
   # let's create a function that squares every value in the dataframe
   return d * d

# create our pool with `num_processes` processes
pool = multiprocessing.Pool(processes=num_processes)

# apply our function to each chunk in the list
result = pool.map(func, chunks)

At this point, result will be a list holding each chunk after it has been manipulated. In this case, all values have been squared. The issue now is that the original dataframe has not been modified, so we have to replace all of its existing values with the results from our pool.

for i in range(len(result)):
   # since result[i] is just a dataframe
   # we can reassign the original dataframe based on the index of each chunk
   df.iloc[result[i].index] = result[i]

Now, my function to manipulate my dataframe is vectorized and would likely have been faster if I had simply applied it to the entirety of my dataframe instead of splitting into chunks. However, in your case, your function would iterate over each row of the each chunk and then return the chunk. This allows you to process num_process rows at a time.

def func(d):
   for row in d.iterrow():
      idx = row[0]
      k = row[1]['Chromosome']
      start,end = row[1]['Bin'].split('-')

      sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
      d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
      d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
      d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
   # return the chunk!
   return d

Then you reassign the values in the original dataframe, and you have successfully parallelized this process.

How Many Processes Should I Use?

Your optimal performance is going to depend on the answer to this question. While "ALL OF THE PROCESSES!!!!" is one answer, a better answer is much more nuanced. After a certain point, throwing more processes at a problem actually creates more overhead than it's worth. This is known as Amdahl's Law. Again, we are fortunate that others have already tackled this question for us:

  1. Python multiprocessing's Pool process limit
  2. How many processes should I run in parallel?

A good default is to use multiprocessing.cpu_count(), which is the default behavior of multiprocessing.Pool. According to the documentation "If processes is None then the number returned by cpu_count() is used." That's why I set num_processes at the beginning to multiprocessing.cpu_count(). This way, if you move to a beefier machine, you get the benefits from it without having to change the num_processes variable directly.

Jinhua Wang
  • 1,679
  • 1
  • 17
  • 44
TheF1rstPancake
  • 2,318
  • 17
  • 17
  • 9
    Use `chunks = [df.iloc[i:i + chunk_size,:] for i in range(0, df.shape[0], chunk_size)] ` if pandas shows warnings. – Dat Feb 10 '18 at 01:33
  • 2
    `np.array_split()` may be a better option than `chunks = [df.ix[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]`. The former will automatically handle the case where the number of rows is not evenly divisible, and the syntax is a little easier. – N4v Jul 13 '19 at 00:22
  • 1
    np.array_split() worked for me as well – MrKingsley Oct 25 '22 at 17:54
  • what should we do if we have a function with three parameter and we should use three column of dataframe? – RF1991 Jun 22 '23 at 09:39
33

A faster way (about 10% in my case):

Main differences to accepted answer: use pd.concat and np.array_split to split and join the dataframre.

import multiprocessing
import numpy as np


def parallelize_dataframe(df, func):
    num_cores = multiprocessing.cpu_count()-1  #leave one free to not freeze machine
    num_partitions = num_cores #number of partitions to split dataframe
    df_split = np.array_split(df, num_partitions)
    pool = multiprocessing.Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

where func is the function you want to apply to df. Use partial(func, arg=arg_val) for more that one argument.

ic_fl2
  • 831
  • 9
  • 29
  • 1
    Just curious, does `pool.map` maintain the order of the dataframe. In other words, is the output from `pool.map` in the same order as the chunks that were passed in? If not, then `pd.concat` may not rebuild the dataframe in the original order. I didn't know about `np.aray_split`, but I'm not surprised it's faster. `pd.concat` is also likely faster than reassigning with `df.ix` – TheF1rstPancake Sep 08 '17 at 19:42
  • 3
    @Jalepeno112 Yes, as far as I can tell the dataframe get's put back together in the correct order. I don't know if there is a way of enforcing it but I have timeseries data and it das not cause problems yet. Though as my index are timestamps it shouldn't be a problem to sort them again if the order got jumbled. Another trick I found was to use itertuples() which is another 30% faster. – ic_fl2 Sep 10 '17 at 12:10
  • This saved my day. Thanks a ton @ic_fl2 – SummerEla Aug 22 '18 at 00:03
  • can you please help me in answering this :- https://stackoverflow.com/questions/53561794/iteration-over-a-pandas-df-in-parallel – ak3191 Dec 17 '18 at 18:38
  • 1
    This is a really nice answer! – Jinhua Wang Sep 18 '21 at 05:10
  • what should we do if we have a function with three parameter and we should use three column of dataframe? – RF1991 Jun 22 '23 at 09:27
  • @RF1991 just pass only the relevant slice of the df to the function. `parallelize_dataframe(df[[1,2,3]], func)` – ic_fl2 Jul 04 '23 at 07:54
13

Consider using dask.dataframe, as e.g. shown in this example for a similar question: https://stackoverflow.com/a/53923034/4340584

import dask.dataframe as ddf
df_dask = ddf.from_pandas(df, npartitions=4)   # where the number of partitions is the number of cores you want to use
df_dask['output'] = df_dask.apply(lambda x: your_function(x), meta=('str')).compute(scheduler='multiprocessing')
Robert
  • 1,357
  • 15
  • 26
  • `dask` solution looks much less cumbersome than manual paralelization of calculation in `pandas`! – sophros Feb 20 '19 at 14:52
1

To use Dask over partitions of a dataframe (instead of dask.apply, which operates over axis), you could use map_partitions:

import multiprocessing
import dask.dataframe as ddf

# get num cpu cores
num_partitions = multiprocessing.cpu_count()

# create dask DF
df_dask = ddf.from_pandas(your_dataframe, npartitions=num_partitions)

# apply func to every partition in parallel
output = df_dask.map_partitions(func, meta=('output_col1_type','output_col2_type')).compute(scheduler='multiprocessing')
DSH
  • 1,038
  • 16
  • 27