189

As of August 2017, Pandas DataFame.apply() is unfortunately still limited to working with a single core, meaning that a multi-core machine will waste the majority of its compute-time when you run df.apply(myfunc, axis=1).

How can you use all your cores to run apply on a dataframe in parallel?

Seanny123
  • 8,776
  • 13
  • 68
  • 124
Roko Mijic
  • 6,655
  • 4
  • 29
  • 36

12 Answers12

160

You may use the swifter package:

pip install swifter

(Note that you may want to use this in a virtualenv to avoid version conflicts with installed dependencies.)

Swifter works as a plugin for pandas, allowing you to reuse the apply function:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

It will automatically figure out the most efficient way to parallelize the function, no matter if it's vectorized (as in the above example) or not.

More examples and a performance comparison are available on GitHub. Note that the package is under active development, so the API may change.

Also note that this will not work automatically for string columns. When using strings, Swifter will fallback to a “simple” Pandas apply, which will not be parallel. In this case, even forcing it to use dask will not create performance improvements, and you would be better off just splitting your dataset manually and parallelizing using multiprocessing.

slhck
  • 36,575
  • 28
  • 148
  • 201
  • 2
    Our of pure curiosity, is there a way to limit number of cores it uses when doing parallel apply? I have a shared server so if I grab all 32 cores no one will be happy. – Maksim Khaitovich Sep 05 '18 at 18:12
  • 1
    @MaximHaytovich I don't know. Swifter uses dask in the background, so maybe it respects these settings: https://stackoverflow.com/a/40633117/435093 — otherwise I'd recommend opening an issue on GitHub. The author is very responsive. – slhck Sep 05 '18 at 18:19
  • @slhck thanks! Will dig it a bit more. It seem to not work on windows server anyway - just hangs not doing anything on toy task – Maksim Khaitovich Sep 05 '18 at 18:48
  • can you please help me in answering this :- https://stackoverflow.com/questions/53561794/iteration-over-a-pandas-df-in-parallel – ak3191 Dec 17 '18 at 18:41
  • This looks like it could be more convenient, thanks. – Roko Mijic May 13 '19 at 20:33
  • 1
    +1 for Swifter. Not only does it parallelize using the best available method, it also integrates progress bars via tqdm. – scribu Jul 22 '19 at 10:49
  • Changing the accepted answer to this one as I think Swifter is now the best point-and-click solution. – Roko Mijic Apr 20 '20 at 16:43
  • 6
    For strings, just add `allow_dask_on_strings(enable=True)` like this: `df.swifter.allow_dask_on_strings(enable=True).apply(some_function)` Source: https://github.com/jmcarpenter2/swifter/issues/45 – learner Jul 17 '20 at 16:22
  • Can you please explain why `string` is differently handled? Also does `swifter` support `lambda` function? such as `df.col.apply(lambda x: x**2)` – notilas Aug 05 '20 at 17:17
  • @notilas I would encourage you to ask the author via a GitHub issue. – slhck Aug 06 '20 at 18:16
  • I gave this a try with a function that was geocoding addresses and it looked for all the world like it was sending some of my data away from my computer - in network connections there was a connection to what looked like amazon ec2. I also got prompted for 2 firewall rules (which I've since disabled). It seems interesting but I don't trust it as of yet, luckily it was on fake test data but I would watch what it does to ensure it doesn't transmit any information (even if it is for performance). – user227669 Jan 12 '21 at 23:30
  • @user227669 Geocoding usually requires sending the data to a remote geocoding service. The reason is that mappings between geolocations and names are usually part of large, proprietary databases, so you can't do it offline. In any case, this has **nothing to do with `swifter`**! It's simply a consequence of using a geocoding function. – slhck Jan 13 '21 at 16:05
  • @slhck That is possible, I would have to inspect the traffic differences between using swifter vs not. I did get the two firewall exception requests. I am using the geopy module which is kind of a facade for a number of geocoding services and its possible that they route some requests through another server rather than going directly to the specific implementation (in my case, arcgis, which doesn't have a full python SDK afaik). – user227669 Feb 05 '21 at 20:06
  • 1
    Installing swifter leads to pip downgrading several importang packages to old versions, among them numpy and pandas. – azureai Apr 15 '21 at 15:31
  • @azureai Use a virtualenv (or poetry or similar) to install dependencies on a per-project basis. – slhck Apr 16 '21 at 10:38
  • @shlck Yeah I'm using a venv already for my project. I was just wondering whether this is intended behavior. I would like to use up-to-date packages and swifter downgrades some of the most important ones. – azureai Apr 16 '21 at 15:36
  • @azurei I think it's an issue with a dependent package, not sure though. – slhck Apr 17 '21 at 16:11
  • It is only using 3 out of 8 cores. – Soerendip Jan 16 '23 at 18:05
138

The simplest way is to use Dask's map_partitions. You need these imports (you will need to pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

and the syntax is

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(I believe that 30 is a suitable number of partitions if you have 16 cores). Just for completeness, I timed the difference on my machine (16 cores):

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0.010668013244867325

Giving a factor of 10 speedup going from pandas apply to dask apply on partitions. Of course, if you have a function you can vectorize, you should - in this case the function (y*(x**2+1)) is trivially vectorized, but there are plenty of things that are impossible to vectorize.

Roko Mijic
  • 6,655
  • 4
  • 29
  • 36
  • 2
    Great to know, thanks for posting. Can you explain why you chose 30 partitions? Does performance change when changing this value? – Andrew L Aug 07 '17 at 11:53
  • 6
    @AndrewL I assume that each partition is serviced by a separate process, and with 16 cores I assume that either 16 or 32 processes can run simultaneously. I tried it out, and performance seems to improve up to 32 partitions, but further increases have no beneficial effect. I assume that with a quad-core machine you would want 8 partitions, etc. Note that I did notice some improvement between 16 and 32, so I think you really do want 2x$NUM_PROCESSORS – Roko Mijic Aug 07 '17 at 12:26
  • 17
    Only thing is `The get= keyword has been deprecated. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'` – wordsforthewise Aug 10 '18 at 01:56
  • can you please help me in answering this :- https://stackoverflow.com/questions/53561794/iteration-over-a-pandas-df-in-parallel – ak3191 Dec 17 '18 at 18:41
  • For python 2.7 you'd probably need to install with: pip install "dask[complete]" – mork Jan 20 '19 at 19:12
  • 7
    For dask v0.20.0 and on, use ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(scheduler='processes'), or one of the other scheduler options. The current code throws "TypeError: The get= keyword has been removed. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'" – mork Jan 20 '19 at 19:15
  • 2
    Make sure that before you do this, the dataframe has no duplicate indexes as it throws `ValueError: cannot reindex from a duplicate axis`. To go around that, either you should remove duplicated indexes by `df = df[~df.index.duplicated()]` or reset your indexes by `df.reset_index(inplace=True)`. – Habib Karbasian May 13 '19 at 03:53
  • 1
    I am getting `TypeError: The get= keyword has been removed.` error. – quest Jun 18 '19 at 20:03
  • I wouldn't call this the simplest given it involves installing some third party library :) I find my proposal below with native python multiprocessing simpler – Olivier Cruchant Sep 28 '19 at 22:40
  • @Olivier: I have some experience using multiprocessing. I found it to be unstable with rogue processes left running even when you try to do things properly. Maybe I'm just not good enough with it? But the definition of simple is something that works without the user needing to know a lot about it. So my take is to avoid multiprocessing if at all possible. – Roko Mijic Sep 30 '19 at 10:30
  • 1
    Tried using dask but it is running slower than pandas_apply() by at least 5x. any idea how? – griffinleow Dec 13 '19 at 05:35
  • I don't know, but it may be the case that things have changed since I wrote the original answer. – Roko Mijic Dec 19 '19 at 23:37
  • pandarallel is way easier and fast. – Dariusz Krynicki Jan 20 '20 at 13:11
  • https://github.com/nalepae/pandarallel Never tried it, but it has 600 stars on github so it might be good – Roko Mijic Jan 20 '20 at 16:57
  • 1
    If you get ```TypeError: The get= keyword has been removed.``` error You have to replace: ```def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get) ``` By: ```def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(scheduler='processes') ``` – Rafa Nogales Dec 17 '20 at 11:04
48

you can try pandarallel instead: A simple and efficient tool to parallelize your pandas operations on all your CPUs (On Linux & macOS)

  • Parallelization has a cost (instanciating new processes, sending data via shared memory, etc ...), so parallelization is efficiant only if the amount of calculation to parallelize is high enough. For very little amount of data, using parallezation not always worth it.
  • Functions applied should NOT be lambda functions.
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

see https://github.com/nalepae/pandarallel

kkkobelief24
  • 597
  • 4
  • 8
  • 1
    hello, I cant resolve one issue, using pandarallel there is an Error: AttributeError: Can't pickle local object 'prepare_worker..closure..wrapper' . Can u help me with this? – Alex Cam Apr 27 '20 at 10:36
  • @Alex Sry I'm not the developer of that module. What's your codes look like? You can try declare your "inside functions" as global ? (just guess) – kkkobelief24 May 08 '20 at 10:01
  • @AlexCam Your function should be defined outside other function so python can pickle it for multiprocessing – Kenan Jun 15 '20 at 04:32
  • 2
    @G_KOBELIEF With Python >3.6 we can use lambda function with pandaparallel – learner Jun 18 '20 at 05:13
  • Can we specify the number of cores? – Nathan B Aug 22 '23 at 19:37
46

If you want to stay in native python:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

will apply function f in a parallel fashion to column col of dataframe df

Olivier Cruchant
  • 3,747
  • 15
  • 18
  • Following an approach like this I got a `ValueError: Length of values does not match length of index` from `__setitem__` in `pandas/core/frame.py`. Not sure if I've done something wrong, or if assigning to ``df['newcol']`` is not threadsafe. – Rattle Sep 18 '19 at 08:35
  • 2
    You can write the pool.map to an intermediary temp_result list to allow checking if length matches with the df, and then doing a df['newcol'] = temp_result? – Olivier Cruchant Sep 18 '19 at 11:10
  • you mean creating the new column? what would you use? – Olivier Cruchant Apr 17 '20 at 21:18
  • yes, assigning the result of the map to the new column of the dataframe. Doesn't map return a list of the result of each chunk sent to the function f? So what happens when you assign that to the column 'newcol? Using Pandas and Python 3 – Mina Apr 20 '20 at 15:04
  • It actually works really smooth! Did you try it? It creates a list of the same length of the df, same order as what was sent. It literally does c2 = f(c1) in a parallel fashion. There is no simpler way to multi-process in python. Performance-wise it seems that Ray can do good things as well (https://towardsdatascience.com/10x-faster-parallel-python-without-python-multiprocessing-e5017c93cce1) but it's not as mature and installation doesn't always go smoothly in my experience – Olivier Cruchant Apr 20 '20 at 20:13
  • That worked but how would you send additional params to `f()`? The only thing I could think of is `pool.map(f, zip(np.array_split(df['col'], mp.cpu_count()), [param] * mp.cpu_count()))` and `f` will be `def f(params): col_values, param = params` – Alaa M. Oct 26 '22 at 14:08
  • `pool.starmap` was created for this :) https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap – Olivier Cruchant Oct 27 '22 at 10:08
13

Just want to give an update answer for Dask

import dask.dataframe as dd

def your_func(row):
  #do something
  return row

ddf = dd.from_pandas(df, npartitions=30) # find your own number of partitions
ddf_update = ddf.apply(your_func, axis=1).compute()

On my 100,000 records, without Dask:

CPU times: user 6min 32s, sys: 100 ms, total: 6min 32s Wall time: 6min 32s

With Dask:

CPU times: user 5.19 s, sys: 784 ms, total: 5.98 s Wall time: 1min 3s

LYu
  • 2,316
  • 4
  • 21
  • 38
12

To use all (physical or logical) cores, you could try mapply as an alternative to swifter and pandarallel.

You can set the amount of cores (and the chunking behaviour) upon init:

import pandas as pd
import mapply

mapply.init(n_workers=-1)

...

df.mapply(myfunc, axis=1)

By default (n_workers=-1), the package uses all physical CPUs available on the system. If your system uses hyper-threading (usually twice the amount of physical CPUs would show up as logical cores), mapply will spawn one extra worker to prioritise the multiprocessing pool over other processes on the system.

Depending on your definition of all your cores, you could also use all logical cores instead (beware that like this the CPU-bound processes will be fighting for physical CPUs, which might slow down your operation):

import multiprocessing
n_workers = multiprocessing.cpu_count()

# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)
ddelange
  • 1,037
  • 10
  • 24
4

Here is an example of sklearn base transformer, in which pandas apply is parallelized

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

for more info see https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

3

The native Python solution (with numpy) that can be applied on the whole DataFrame as the original question asks (not only on a single column)

import numpy as np
import multiprocessing as mp

dfs = np.array_split(df, 8000) # divide the dataframe as desired

def f_app(df):
    return df.apply(myfunc, axis=1)

with mp.Pool(mp.cpu_count()) as pool:
    res = pd.concat(pool.map(f_app, dfs))
Pavel Prochazka
  • 695
  • 8
  • 13
1

Here another one using Joblib and some helper code from scikit-learn. Lightweight (if you already have scikit-learn), good if you prefer more control over what it is doing since joblib is easily hackable.

from joblib import parallel_backend, Parallel, delayed, effective_n_jobs
from sklearn.utils import gen_even_slices
from sklearn.utils.validation import _num_samples


def parallel_apply(df, func, n_jobs= -1, **kwargs):
    """ Pandas apply in parallel using joblib. 
    Uses sklearn.utils to partition input evenly.
    
    Args:
        df: Pandas DataFrame, Series, or any other object that supports slicing and apply.
        func: Callable to apply
        n_jobs: Desired number of workers. Default value -1 means use all available cores.
        **kwargs: Any additional parameters will be supplied to the apply function
        
    Returns:
        Same as for normal Pandas DataFrame.apply()
        
    """
    
    if effective_n_jobs(n_jobs) == 1:
        return df.apply(func, **kwargs)
    else:
        ret = Parallel(n_jobs=n_jobs)(
            delayed(type(df).apply)(df[s], func, **kwargs)
            for s in gen_even_slices(_num_samples(df), effective_n_jobs(n_jobs)))
        return pd.concat(ret)

Usage: result = parallel_apply(my_dataframe, my_func)

Yaroslav
  • 1,241
  • 10
  • 13
1

Instead of

df["new"] = df["old"].map(fun)

do

from joblib import Parallel, delayed
df["new"] = Parallel(n_jobs=-1, verbose=10)(delayed(fun)(i) for i in df["old"])

To me this is a slight improvement over

import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
    df["new"] = pool.map(fun, df["old"])

as you get a progress indication and automatic batching if the jobs are very small.

0-_-0
  • 1,313
  • 15
  • 15
0

Since the question was "How can you use all your cores to run apply on a dataframe in parallel?", the answer can also be with modin. You can run all cores in parallel, though the real time is worse.

See https://github.com/modin-project/modin . It runs of top of dask or ray. They say "Modin is a DataFrame designed for datasets from 1MB to 1TB+." I tried: pip3 install "modin"[ray]". Modin vs pandas was - 12 sec on six cores vs. 6 sec.

jaromrax
  • 274
  • 1
  • 12
0

In case you need to do something based on the column name inside the function beware that .apply function may give you some trouble. In my case I needed to change the column type using astype() function based on the column name. This is probably not the most efficient way of doing it but suffices the purpose and keeps the column names as the original one.

import multiprocessing as mp

def f(df):
    """ the function that you want to apply to each column """
    column_name = df.columns[0] # this is the same as the original column name
    # do something what you need to do to that column
    return df

# Here I just make a list of all the columns. If you don't use .to_frame() 
# it will pass series type instead of a dataframe

dfs = [df[column].to_frame() for column in df.columns]
with mp.Pool(mp.cpu_num) as pool:
    processed_df = pd.concat(pool.map(f, dfs), axis=1)
Ehsan Fathi
  • 598
  • 5
  • 21