7

I want to apply some function on all pandas columns in parallel. For example, I want to do this in parallel:

def my_sum(x, a):
    return x + a


df = pd.DataFrame({'num_legs': [2, 4, 8, 0],
                   'num_wings': [2, 0, 0, 0]})
df.apply(lambda x: my_sum(x, 2), axis=0)

I know there is a swifter package, but it doesn't support axis=0 in apply:

NotImplementedError: Swifter cannot perform axis=0 applies on large datasets. Dask currently does not have an axis=0 apply implemented. More details at https://github.com/jmcarpenter2/swifter/issues/10

Dask also doesn't support this for axis=0 (according to documentation in swifter).

I have googled several sources but couldn't find an easy solution.

Can't believe this is so complicated in pandas.

Mislav
  • 1,533
  • 16
  • 37
  • 1
    may be take a look at [`Pandarallel`](https://github.com/nalepae/pandarallel/tree/v1.4.6) – anky Mar 19 '20 at 14:17
  • transpose the data and pass `axis=1`? – Quang Hoang Mar 19 '20 at 14:19
  • @QuangHoang, I thought about that, but I have lots of rows (> 4 million), not sure if this is the recommended way. – Mislav Mar 19 '20 at 14:24
  • @anky_91, a have just tried Pandarallel. It gets stuck forever and never ends. Maybe there is some problem because I am on the windows. I can't believe it. In R there are at least 3 simple solutions to this. – Mislav Mar 19 '20 at 14:48
  • https://github.com/jmcarpenter2/swifter/pull/98 - maybe this is what you're looking for – Skarlett Mar 21 '20 at 15:35
  • @Skarlett, I don't need applymap function, but apply. That is, I have to apply function to whole column, not to every elemet of the column. – Mislav Mar 21 '20 at 16:31
  • When asking a question here, please describe the problem you are trying to solve, rather than asking about the method you think is the solution to that problem. We'll tell you if a parallel apply is really the best solution to your problem. Bashing a framework for a lack of support of non-idiomatic use cases won't help either. More details about the function you are trying to apply would help. – cs95 Mar 22 '20 at 18:29
  • 1
    My original problem is the same as in the post above. Only defference is that function is more complicated. – Mislav Mar 23 '20 at 13:30
  • @cs95 why would you consider a column-wise apply to be non-idiomatic? Especially as `axis=0` is the default – Arco Bast Mar 23 '20 at 15:57
  • https://stackoverflow.com/questions/54432583/when-should-i-ever-want-to-use-pandas-apply-in-my-code/54432584#54432584 what function are you trying to apply is what I was getting at. – cs95 Mar 23 '20 at 16:47
  • Did you find a solution yet? – Arco Bast Mar 27 '20 at 21:19
  • Try [`mapply`](https://pypi.org/project/mapply/) with `chunk_size=1` (author here). For axis=0, each column will need to go to a worker without splitting the column into chunks. So if you have multiple columns and multiple physical CPUs, `mapply` will send each column to a separate worker, and combine the results afterwards to yield the same result as a regular apply would. – ddelange Nov 02 '20 at 10:14
  • this task is trivially solved using the parallel-pandas library – padu Nov 21 '22 at 17:02

4 Answers4

4

Koalas provides a way to perform computation on a dataframe in parallel. It accepts the same commands as pandas but performs them on a Apache Spark engine in the background.

Note that you do need the parallel infrastructure available in order to use it properly.

On their blog post they compare the following chunks of code:

pandas:

import pandas as pd
df = pd.DataFrame({'x': [1, 2], 'y': [3, 4], 'z': [5, 6]})
# Rename columns
df.columns = [‘x’, ‘y’, ‘z1’]
# Do some operations in place
df[‘x2’] = df.x * df.x

Koalas:

import databricks.koalas as ks
df = ks.DataFrame({'x': [1, 2], 'y': [3, 4], 'z': [5, 6]})
# Rename columns
df.columns = [‘x’, ‘y’, ‘z1’]
# Do some operations in place
df[‘x2’] = df.x * df.x
gosuto
  • 5,422
  • 6
  • 36
  • 57
  • [pandas, PySpark, pyarrow, matplotlib](https://koalas.readthedocs.io/en/latest/getting_started/install.html) – gosuto Mar 23 '20 at 11:52
  • I will install PySpark and pyarrow, tried it and give you feedback – Mislav Mar 23 '20 at 11:54
  • lot's of dependencies for such a "simple" task. – Mislav Mar 23 '20 at 12:05
  • ks.from_pandas also freezes my session :(. I don't understand what's happening. – Mislav Mar 23 '20 at 12:11
  • 1
    The task is not as simple as it seems. Since Python is not a parallel language by itself you needs some other engine which makes that translation. For Koalas that is Apache Spark. – gosuto Mar 23 '20 at 12:52
1

The real answer is hidden in comments, so I will add this answer: use mapply.

import pandas as pd
import mapply

mapply.init(n_workers=-1, chunk_size=1)

def my_sum(x, a):
    return x + a

df = pd.DataFrame({'num_legs': [2, 4, 8, 0],
                   'num_wings': [2, 0, 0, 0]})
df.mapply(lambda x: my_sum(x, 2), axis=0)

I tried swifter and pandarallel, but swifter simply didn't work with columns, and pandarallel seemed to duplicate the work on all workers. Only mapply worked.

AKSoo
  • 31
  • 3
  • not sure if it was intentional, but you have n_workers set to -1, this didn't work for me and threw very confusing errors until I set n_workers to a positive number – krock May 18 '21 at 16:12
  • actually turns out that it works on Linux, but I wasn't able to get it to work on Windows – krock May 18 '21 at 17:16
0

You can use the dask delayed interface to set up a custom workflow:

import pandas as pd
import dask
import distributed

# start local cluster, by default one worker per core
client = distributed.Client() 

@dask.delayed
def my_sum(x, a):
    return x + a

df = pd.DataFrame({'num_legs': [2, 4, 8, 0],
                   'num_wings': [2, 0, 0, 0]})    

# Here, we mimic the apply command. However, we do not
# actually run any computation. Instead, that line of code 
# results in a list of delayed objects, which contain the 
# information what computation should be performed eventually
delayeds = [my_sum(df[column], 2) for column in df.columns]

# send the list of delayed objects to the cluster, which will 
# start computing the result in parallel. 
# It returns future objects, pointing to the computation while
# it is still running
futures = client.compute(delayeds)

# get all the results, as soon as they are ready. This returns 
# a list of pandas Series objects, each is one column of the 
# output dataframe
computed_columns = client.gather(futures)

# create dataframe out of individual columns
computed_df = pd.concat(computed_columns, axis = 1)

Alternatively, you could also use the multiprocessing backend of dask:

import pandas as pd
import dask

@dask.delayed
def my_sum(x, a):
    return x + a

df = pd.DataFrame({'num_legs': [2, 4, 8, 0],
                   'num_wings': [2, 0, 0, 0]})    

# same as above
delayeds = [my_sum(df[column], 2) for column in df.columns]

# run the computation using the dask's multiprocessing backend
computed_columns = dask.compute(delayeds, scheduler = 'processes')

# create dataframe out of individual columns
computed_df = pd.concat(computed_columns, axis = 1)
Arco Bast
  • 3,595
  • 2
  • 26
  • 53
  • distributed.Client() freezes my session. Maybe due to this issue: https://github.com/dask/dask/issues/5525 – Mislav Mar 23 '20 at 11:52
  • `distributed.Client()` is a convenience method, which unfortunately causes trouble sometimes. You could either go through the effort to set up a cluster ['the hard way'](https://distributed.dask.org/en/latest/quickstart.html#setup-dask-distributed-the-hard-way) (actually not that hard, you need to run two commands in the terminal) or use the multiprocessing backend instead (see my edit in two minutes) – Arco Bast Mar 23 '20 at 12:06
  • dask.compute never ends now. I didn't say, I am on windows 10, code in VScode and have AMD Ryzen processor. – Mislav Mar 23 '20 at 12:20
  • @Mislav maybe there is a problem whenever you try to spawn new processes from within your session. can you try setting up the cluster on the terminal and then connect to the scheduler from within your session? Have a look here about how to do it: https://distributed.dask.org/en/latest/quickstart.html#setup-dask-distributed-the-hard-way. – Arco Bast Mar 23 '20 at 13:40
0

In my opinion, this case should be tackled focusing on how the data is split over the available resources. Dask offers map_partitions which applies a Python function on each DataFrame partition. Of course, the number of rows per partition that your workstation can deal with depends on the available hardware resources. Here is an example based on the information you provided in your question:

# imports
import dask
from dask import dataframe as dd
import multiprocessing as mp
import numpy as np
import pandas as pd

# range for values to be randomly generated
range_ = {
    "min": 0,
    "max": 100
}

# rows and columns for the fake dataframe
df_shape = (
                int(1e8), # rows
                2 # columns
            )

# some fake data
data_in = pd.DataFrame(np.random.randint(range_["min"], range_["max"], size = df_shape), columns = ["legs", "wings"])

# function to apply adding some value a to the partition
def my_sum(x, a):
    return x + a
"""
applies my_sum on the partitions rowwise (axis = 0)

number of partitions = cpu_count

the scheduler can be:
"threads": Uses a ThreadPool in the local process
"processes": Uses a ProcessPool to spread work between processes
"single-threaded": Uses a for-loop in the current thread
"""
data_out = dd.from_pandas(data_in, npartitions = mp.cpu_count()).map_partitions(
        lambda df: df.apply(
            my_sum, axis = 0, a = 2
        )
).compute(scheduler = "threads")

# inspection
print(data_in.head(5))
print(data_out.head(5))

This implementation was tested on a random generated dataframe with 100,000,000 rows and 2 columns.

Workstation Specs
CPU: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
Memory Total: 16698340 kB
OS: Ubuntu 18.04.4 LTS

Chicodelarose
  • 827
  • 7
  • 22