1

I have a dataframe with

  • 5 millions of rows.
  • a column group_id whose number of unique elements is 500.000.
  • thousands of other columns named var1, var2, etc. Each of var1, var2, ... contains only 0 and 1.

I would like to group by group_id and then sum them up. To have better performance, I use dask. However, the speed is still slow for this simple aggregation.

The time spent on a dataframe with 10 columns is 6.285385847091675 seconds
The time spent on a dataframe with 100 columns is 64.9060411453247 seconds
The time spent on a dataframe with 200 columns is 150.6109869480133 seconds
The time spent on a dataframe with 300 columns is 235.77087807655334 seconds

My real dataset contains up to 30.000 columns. I have read answers (1 and 2) by @Divakar about using numpy. However, the former thread is about counting and the latter is about summing columns.

Could you please elaborate on some ways to speed up this aggregation?

import numpy as np
import pandas as pd
import os, time
from multiprocessing import dummy
import dask.dataframe as dd

core = os.cpu_count()
P = dummy.Pool(processes = core)

n_docs = 500000
n_rows = n_docs * 10
data = {}

def create_col(i):
    name = 'var' + str(i)
    data[name] = np.random.randint(0, 2, n_rows)

n_cols = 300
P.map(create_col, range(1, n_cols + 1))
df = pd.DataFrame(data, dtype = 'int8')
df.insert(0, 'group_id', np.random.randint(1, n_docs + 1, n_rows))
df = dd.from_pandas(df, npartitions = 3 * core) 

start = time.time()
df.groupby('group_id').sum().compute()
end = time.time()
print('The time spent on a dataframe with {} columns is'.format(n_cols), end - start, 'seconds')
Akira
  • 2,594
  • 3
  • 20
  • 45

1 Answers1

2

(I misunderstood OP in original answer, so clearing all).

I got improvement by:

  • switching to numpy
  • using same dtype for group and data (np.int32)
  • using numba with parallel mode'
import numba as nb
@nb.njit('int32[:, :](int32[:, :], int_)', parallel=True)
def count_groups2(group_and_data, n_groups):
    n_cols = group_and_data.shape[1] - 1
    counts = np.zeros((n_groups, n_cols), dtype=np.int32)
    for idx in nb.prange(len(group_and_data)):
        row = group_and_data[idx]
        counts[row[0]] += row[1:]
    return counts

df = pd.DataFrame(data, dtype='int32')
group_id = np.random.randint(1, n_docs + 1, n_rows, dtype=np.int32)
df.insert(0, 'group_id', group_id)

# switching to numpy (line below) is costly
# it would be faster to work with numpy alone (no pandas)
group_and_data = df.values
count_groups2(group_and_data)
op_method(df)

    72         1    1439807.0 1439807.0      7.0      group_and_data = df.values
    73         1    1341527.0 1341527.0      6.5      count_groups2(group_and_data, n_groups=500_000)
    74         1   12043334.0 12043334.0     58.5      op_method(df)
dankal444
  • 3,172
  • 1
  • 23
  • 35
  • This is awesome!!! In your code, there is a loop `for idx in range(len(group_ids))`. My computer has 28 cores and 56 threads. Is there any modification to parallel `for idx in range(len(group_ids))`? – Akira Dec 11 '21 at 13:15
  • From the line `sums = df_values[:, 1:].sum(axis=1)`, I think you misunderstood my goal :) I mean to sum rows that belong to the same group, not to sum columns. – Akira Dec 11 '21 at 13:21
  • @Akura I edited and added parallelism, but it gives some significant boost only when using numpy arrays instead of dataframes. – dankal444 Dec 11 '21 at 13:54
  • @Akira You mean you count them? Or maybe for each group you want to have sum for each column? – dankal444 Dec 11 '21 at 13:55
  • Exactly, I want to have sum of all rows for each column. – Akira Dec 11 '21 at 14:00
  • @Akira I fixed the answer, take a look – dankal444 Dec 11 '21 at 15:09
  • I'm a bit confused. So you have run 3 versions `op_method(df)`, `faster4(group_and_data)`, and `group_and_data = df.values` and mentioned that *switching to numpy (line below) is costly, it would be faster to work with numpy alone (no pandas)*. I understand that `op_method(df)` is my original code. Can you explain what is the difference between 2 versions `faster4(group_and_data)` and `group_and_data = df.values`? – Akira Dec 11 '21 at 15:34
  • I have run your updated code [here](https://colab.research.google.com/drive/1cdmjnGWgQcpwyzRsbMgT6Bb7h4F9Bt8K?usp=sharing), but it returns an error :(( Could you please include the full code of the fastest version `faster4(group_and_data)`? – Akira Dec 11 '21 at 15:59
  • 1
    @Akira sorry, faster4 was just old name I forgot to change, I guess you already got it working but updated answer anyway – dankal444 Dec 11 '21 at 20:57
  • May I ask what is the inumber `7.0` in the line `72 1 1439807.0 1439807.0 7.0 group_and_data = df.values`? – Akira Dec 11 '21 at 21:01
  • 1
    @Akira Its percent of time this line took in a profiled function. I use `line_profiler` that gives results in such a form: `Line # Hits Time Per Hit % Time Line Contents` – dankal444 Dec 11 '21 at 21:38
  • 1
    Applying your code to my real dataset reduced the computation time from 1h 20min 54s to just 7min 22s. My CPU with 28 cores and 56 threads are [fully utilized](https://i.stack.imgur.com/hEe6X.png). This is very impressive. Thank you so much again. – Akira Dec 12 '21 at 09:03
  • 1
    Glad to hear that. Thank you for letting me know :) – dankal444 Dec 12 '21 at 12:05