118

I have a large dataframe (several million rows).

I want to be able to do a groupby operation on it, but just grouping by arbitrary consecutive (preferably equal-sized) subsets of rows, rather than using any particular property of the individual rows to decide which group they go to.

The use case: I want to apply a function to each row via a parallel map in IPython. It doesn't matter which rows go to which back-end engine, as the function calculates a result based on one row at a time. (Conceptually at least; in reality it's vectorized.)

I've come up with something like this:

# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)

# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]

# Process chunks in parallel
results = dview.map_sync(my_function, groups)

But this seems very long-winded, and doesn't guarantee equal sized chunks. Especially if the index is sparse or non-integer or whatever.

Any suggestions for a better way?

Thanks!

Andrew Clegg
  • 1,502
  • 3
  • 11
  • 9

8 Answers8

149

Use numpy's array_split():

import numpy as np
import pandas as pd

data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
  assert len(chunk) == len(data) / 5, "This assert may fail for the last chunk if data lenght isn't divisible by 5"
Ivelin
  • 12,293
  • 5
  • 37
  • 35
  • 11
    This is the most elegant method. Just a simple built-in function call, should be the accepted answer. – DaSarfyCode Aug 25 '18 at 10:45
  • 9
    That assertion won't be true when the dataframe's length isn't divisible by the number of chunks, but this will otherwise behave as expected -- the last few dataframes will all be one row shorter than the first ones. – BallpointBen Jan 02 '19 at 19:46
  • 2
    This is about 5 to 10x slower than alternatives, e.g. using groupby as suggested, but on a `np.arange` rather than the index. – Herbert Mar 16 '22 at 09:59
  • 3
    Note: The question is about arbitrary number of chunks, while np.array_split is about specifc number of chunks irrespective of the size of the batch/chunk size. Refer to np.array_split documentation - https://numpy.org/doc/stable/reference/generated/numpy.array_split.html – Senthilkumar Gopal Aug 22 '22 at 21:44
  • @SenthilkumarGopal you can get one knowing the length of the dataframe and the other. `chunk_size = len(data) // num_chunks` – Pranav Hosangadi Apr 15 '23 at 05:50
  • @PranavHosangadi Agreed. But isnt the question about splitting the dataframe into specific chunk_size and then generate arbritary number of chunks which np.array_split takes in number of chunks, but not chunk_size? – Senthilkumar Gopal Apr 18 '23 at 20:37
63

I'm not sure if this is exactly what you want, but I found these grouper functions on another SO thread fairly useful for doing a multiprocessor pool.

Here's a short example from that thread, which might do something like what you want:

import numpy as np
import pandas as pds

df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))

for i in chunker(df,5):
    print i

Which gives you something like this:

          a         b         c         d
0  0.860574  0.059326  0.339192  0.786399
1  0.029196  0.395613  0.524240  0.380265
2  0.235759  0.164282  0.350042  0.877004
3  0.545394  0.881960  0.994079  0.721279
4  0.584504  0.648308  0.655147  0.511390
          a         b         c         d
5  0.276160  0.982803  0.451825  0.845363
6  0.728453  0.246870  0.515770  0.343479
7  0.971947  0.278430  0.006910  0.888512
8  0.044888  0.875791  0.842361  0.890675
9  0.200563  0.246080  0.333202  0.574488
           a         b         c         d
10  0.971125  0.106790  0.274001  0.960579
11  0.722224  0.575325  0.465267  0.258976
12  0.574039  0.258625  0.469209  0.886768
13  0.915423  0.713076  0.073338  0.622967

I hope that helps.

EDIT

In this case, I used this function with pool of processors in (approximately) this manner:

from multiprocessing import Pool

nprocs = 4

pool = Pool(nprocs)

for chunk in chunker(df, nprocs):
    data = pool.map(myfunction, chunk)
    data.domorestuff()

I assume this should be very similar to using the IPython distributed machinery, but I haven't tried it.

Community
  • 1
  • 1
Ryan
  • 1,064
  • 1
  • 7
  • 14
  • 1
    That'd certainly do the trick. I'm still kinda holding out for some neat groupby one-liner, but if nothing like that materializes, you get the prize :-) – Andrew Clegg Sep 06 '14 at 17:21
59

In practice, you can't guarantee equal-sized chunks. The number of rows (N) might be prime, in which case you could only get equal-sized chunks at 1 or N. Because of this, real-world chunking typically uses a fixed size and allows for a smaller chunk at the end. I tend to pass an array to groupby. Starting from:

>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
    0         1         2         3         4
0   0  0.746300  0.346277  0.220362  0.172680
0   1  0.657324  0.687169  0.384196  0.214118
0   2  0.016062  0.858784  0.236364  0.963389
[...]
0  13  0.510273  0.051608  0.230402  0.756921
0  14  0.950544  0.576539  0.642602  0.907850

[15 rows x 5 columns]

where I've deliberately made the index uninformative by setting it to 0, we simply decide on our size (here 10) and integer-divide an array by it:

>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
...     print(k,g)
...     
0    0         1         2         3         4
0  0  0.746300  0.346277  0.220362  0.172680
0  1  0.657324  0.687169  0.384196  0.214118
0  2  0.016062  0.858784  0.236364  0.963389
[...]
0  8  0.241049  0.246149  0.241935  0.563428
0  9  0.493819  0.918858  0.193236  0.266257

[10 rows x 5 columns]
1     0         1         2         3         4
0  10  0.037693  0.370789  0.369117  0.401041
0  11  0.721843  0.862295  0.671733  0.605006
[...]
0  14  0.950544  0.576539  0.642602  0.907850

[5 rows x 5 columns]

Methods based on slicing the DataFrame can fail when the index isn't compatible with that, although you can always use .iloc[a:b] to ignore the index values and access data by position.

Dan Grahn
  • 9,044
  • 4
  • 37
  • 74
DSM
  • 342,061
  • 65
  • 592
  • 494
  • 3
    This was what I had in mind! Well technically "df.groupby(np.arange(len(df)) // (len(df) / 10))" to get a fixed number of groups (1 per core) instead of fixed size. For some reason it hadn't occurred to me that the grouping key need not actually be related to the index at all... – Andrew Clegg Sep 06 '14 at 18:01
  • 2
    It is worth mentioning that for efficiency it is probably better to read the original file using an "iterator" (https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html) and a "chunksize" so that the read_csv function does the reading and each fragment can be passed to a separate process as described by @Ryan – Andrei Sura Nov 08 '17 at 00:01
14

Chunks generator function for iterating pandas Dataframes and Series

A generator version of the chunk function is presented below. Moreover this version works with custom index of the pd.DataFrame or pd.Series (e.g. float type index)

    import numpy as np
    import pandas as pd

    df_sz = 14

    df = pd.DataFrame(np.random.rand(df_sz,4), 
                      index=np.linspace(0., 10., num=df_sz),
                      columns=['a', 'b', 'c', 'd']
                     )

    def chunker(seq, size):
        for pos in range(0, len(seq), size):
            yield seq.iloc[pos:pos + size] 

    chunk_size = 6
    for i in chunker(df, chunk_size):
        print(i)

   chnk = chunker(df, chunk_size)
   print('\n', chnk)
   print(next(chnk))
   print(next(chnk))
   print(next(chnk))

The output is

                 a         b         c         d
0.000000  0.560627  0.665897  0.683055  0.611884
0.769231  0.241871  0.357080  0.841945  0.340778
1.538462  0.065009  0.234621  0.250644  0.552410
2.307692  0.431394  0.235463  0.755084  0.114852
3.076923  0.173748  0.189739  0.148856  0.031171
3.846154  0.772352  0.697762  0.557806  0.254476
                 a         b         c         d
4.615385  0.901200  0.977844  0.250316  0.957408
5.384615  0.400939  0.520841  0.863015  0.177043
6.153846  0.356927  0.344220  0.863067  0.400573
6.923077  0.375417  0.156420  0.897889  0.810083
7.692308  0.666371  0.152800  0.482446  0.955556
8.461538  0.242711  0.421591  0.005223  0.200596
                  a         b         c         d
9.230769   0.735748  0.402639  0.527825  0.595952
10.000000  0.420209  0.365231  0.966829  0.514409

- generator object chunker at 0x7f503c9d0ba0

First "next()":
                 a         b         c         d
0.000000  0.560627  0.665897  0.683055  0.611884
0.769231  0.241871  0.357080  0.841945  0.340778
1.538462  0.065009  0.234621  0.250644  0.552410
2.307692  0.431394  0.235463  0.755084  0.114852
3.076923  0.173748  0.189739  0.148856  0.031171
3.846154  0.772352  0.697762  0.557806  0.254476

Second "next()":
                 a         b         c         d
4.615385  0.901200  0.977844  0.250316  0.957408
5.384615  0.400939  0.520841  0.863015  0.177043
6.153846  0.356927  0.344220  0.863067  0.400573
6.923077  0.375417  0.156420  0.897889  0.810083
7.692308  0.666371  0.152800  0.482446  0.955556
8.461538  0.242711  0.421591  0.005223  0.200596

Third "next()":
                  a         b         c         d
9.230769   0.735748  0.402639  0.527825  0.595952
10.000000  0.420209  0.365231  0.966829  0.514409
Andrei Krivoshei
  • 715
  • 7
  • 16
13

A sign of a good environment is many choices, so I'll add this from Anaconda Blaze, really using Odo

import blaze as bz
import pandas as pd

df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]})

for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
    # Do stuff with chunked dataframe
Miles
  • 1,104
  • 11
  • 13
5
import pandas as pd

def batch(iterable, batch_number=10):
    """
    split an iterable into mini batch with batch length of batch_number
    supports batch of a pandas dataframe
    usage:
        for i in batch([1,2,3,4,5], batch_number=2):
            print(i)
        
        for idx, mini_data in enumerate(batch(df, batch_number=10)):
            print(idx)
            print(mini_data)
    """
    l = len(iterable)

    for idx in range(0, l, batch_number):
        if isinstance(iterable, pd.DataFrame):
            # dataframe can't split index label, should iter according index
            yield iterable.iloc[idx:min(idx+batch_number, l)]
        else:
            yield iterable[idx:min(idx+batch_number, l)]
SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
wllbll
  • 531
  • 5
  • 11
3

Your suggestion to use groupby is quite good, but you should rather use np.arange(len(dataframe)) // batch_size than dataframe.index, since the index can be non-integer and non-consequtive.

I've run some benchmarks on the answers given. The top-voted one is horribly slow. Please consider using the accepted solution:

data.groupby(np.arange(len(dataframe)) // batch_size)

benchmarks

Benchmark code:

import numpy as np
import pandas as pd
import time
from tqdm.auto import tqdm

#@markdown # Create a properly funcky `pd.DataFrame`
data = pd.DataFrame([
  {
      'x': np.random.randint(23515243),
      'y': 364274*np.random.rand()-134562,
      'z': ''.join(np.random.choice(list('`1234567890-=qwertyuiop[]\asdfghjkl;\'zxcvbnm,./~!@#$%^&*()_+QWERTYUIOP{}|ASDFGHJKL:"ZXCVBNM<>?'), np.random.randint(54,89), replace=True)),
  }
  for _ in tqdm(range(22378))
])
data.index = ['a'] * len(data)

data = pd.concat([data] * 100)

batch_size = 64

times = []

t0 = time.time()
for chunk in np.array_split(data, (len(data) + batch_size - 1) // batch_size):
  pass
times.append({'method': 'np.array_split', 'time': -t0 + time.time()})

t0 = time.time()
for _, chunk in data.groupby(np.arange(len(data)) // batch_size):
  pass
times.append({'method': 'groupby', 'time': -t0 + time.time()})

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))
  
t0 = time.time()
for chunk in chunker(data, batch_size):
  pass
times.append({'method': '[]-syntax', 'time': -t0 + time.time()})

# t0 = time.time()
# for chunk in bz.odo(data, target=bz.chunks(pd.DataFrame), chunksize=batch_size):
#   pass
# times.append({'method': 'bz.odo', 'time': -t0 + time.time()})


def chunker(seq, size):
    for pos in range(0, len(seq), size):
        yield seq.iloc[pos:pos + size] 

t0 = time.time()
for i in chunker(data, batch_size):
    pass
times.append({'method': '.iloc[]-syntax', 'time': -t0 + time.time()})

pd.DataFrame(times)
Herbert
  • 5,279
  • 5
  • 44
  • 69
3

Another approach..

# .. load df ..

CHUNK_SIZE = 100000

for chunk_num in range(len(df) // CHUNK_SIZE + 1):
    start_index = chunk_num*CHUNK_SIZE
    end_index = min(chunk_num*CHUNK_SIZE + CHUNK_SIZE, len(df))
    chunk = df[start_index:end_index]

    # .. do calculaton on chunk here ..
Pithikos
  • 18,827
  • 15
  • 113
  • 136