79

(This question is related to this one and this one, but those are pre-walking the generator, which is exactly what I want to avoid)

I would like to split a generator in chunks. The requirements are:

  • do not pad the chunks: if the number of remaining elements is less than the chunk size, the last chunk must be smaller.
  • do not walk the generator beforehand: computing the elements is expensive, and it must only be done by the consuming function, not by the chunker
  • which means, of course: do not accumulate in memory (no lists)

I have tried the following code:

def head(iterable, max=10):
    for cnt, el in enumerate(iterable):
        yield el
        if cnt >= max:
            break

def chunks(iterable, size=10):
    i = iter(iterable)
    while True:
        yield head(i, size)

# Sample generator: the real data is much more complex, and expensive to compute
els = xrange(7)

for n, chunk in enumerate(chunks(els, 3)):
    for el in chunk:
        print 'Chunk %3d, value %d' % (n, el)

And this somehow works:

Chunk   0, value 0
Chunk   0, value 1
Chunk   0, value 2
Chunk   1, value 3
Chunk   1, value 4
Chunk   1, value 5
Chunk   2, value 6
^CTraceback (most recent call last):
  File "xxxx.py", line 15, in <module>
    for el in chunk:
  File "xxxx.py", line 2, in head
    for cnt, el in enumerate(iterable):
KeyboardInterrupt

Buuuut ... it never stops (I have to press ^C) because of the while True. I would like to stop that loop whenever the generator has been consumed, but I do not know how to detect that situation. I have tried raising an Exception:

class NoMoreData(Exception):
    pass

def head(iterable, max=10):
    for cnt, el in enumerate(iterable):
        yield el
        if cnt >= max:
            break
    if cnt == 0 : raise NoMoreData()

def chunks(iterable, size=10):
    i = iter(iterable)
    while True:
        try:
            yield head(i, size)
        except NoMoreData:
            break

# Sample generator: the real data is much more complex, and expensive to compute    
els = xrange(7)

for n, chunk in enumerate(chunks(els, 2)):
    for el in chunk:
        print 'Chunk %3d, value %d' % (n, el)

But then the exception is only raised in the context of the consumer, which is not what I want (I want to keep the consumer code clean)

Chunk   0, value 0
Chunk   0, value 1
Chunk   0, value 2
Chunk   1, value 3
Chunk   1, value 4
Chunk   1, value 5
Chunk   2, value 6
Traceback (most recent call last):
  File "xxxx.py", line 22, in <module>
    for el in chunk:
  File "xxxx.py", line 9, in head
    if cnt == 0 : raise NoMoreData
__main__.NoMoreData()

How can I detect that the generator is exhausted in the chunks function, without walking it?

Community
  • 1
  • 1
blueFast
  • 41,341
  • 63
  • 198
  • 344
  • Don't know how to fix it, but that `except` will only catch the exception if it is raised when _creating_ `head`, not when iterating it. – tobias_k Jul 02 '14 at 09:08
  • @tobias_k: sure, I understand that. I am looking for a fix for that ... – blueFast Jul 02 '14 at 09:09
  • Would it be okay to peek at the first element? You could try to `next` the first element, then raise an exception or return the actual chunk iterator. – tobias_k Jul 02 '14 at 09:16
  • @tobias_k: that would be a good compromise, but not sure how to implement that without losing that element ... – blueFast Jul 02 '14 at 09:18
  • Can you clarify what do you mean by "pre-walking the generator"? – pylang Aug 29 '17 at 00:17

12 Answers12

96

One way would be to peek at the first element, if any, and then create and return the actual generator.

def head(iterable, max=10):
    first = next(iterable)      # raise exception when depleted
    def head_inner():
        yield first             # yield the extracted first element
        for cnt, el in enumerate(iterable):
            yield el
            if cnt + 1 >= max:  # cnt + 1 to include first
                break
    return head_inner()

Just use this in your chunk generator and catch the StopIteration exception like you did with your custom exception.


Update: Here's another version, using itertools.islice to replace most of the head function, and a for loop. This simple for loop in fact does exactly the same thing as that unwieldy while-try-next-except-break construct in the original code, so the result is much more readable.

def chunks(iterable, size=10):
    iterator = iter(iterable)
    for first in iterator:    # stops when iterator is depleted
        def chunk():          # construct generator for next chunk
            yield first       # yield element from for loop
            for more in islice(iterator, size - 1):
                yield more    # yield more elements from the iterator
        yield chunk()         # in outer generator, yield next chunk

And we can get even shorter than that, using itertools.chain to replace the inner generator:

def chunks(iterable, size=10):
    iterator = iter(iterable)
    for first in iterator:
        yield chain([first], islice(iterator, size - 1))
tobias_k
  • 81,265
  • 12
  • 120
  • 179
  • That works! Thanks! I have the full example: if you want I can edit your answer to include it and have it as reference. – blueFast Jul 02 '14 at 09:37
  • 1
    Using the initial example of `for n, chunk in enumerate(chunks(xrange(0,100), 10)):`, this does not appear to work for me. If I `print(n)` in that for loop, I end up with 100 chunks. Am I misunderstanding something here? – kadrach Mar 20 '17 at 02:59
  • 1
    @kadrach That is because you are not consuming the chunks before generating the next one. The code only works if each chunk is consumed before the next one is generated; otherwise, how should chunk n know what it's first element is, without actually advancing the iterator until the end of chunk n-1? – tobias_k Mar 20 '17 at 09:07
  • 1
    I think it would be nice to behave like `itertools.groupby`, i.e., not change chunks/groups just because they're not consumed. [Possible implementation](https://tio.run/##nVAxbsMwDNz1Cm6RAhVokKUI4C/kA0EG16ETojZpUArQ9POuJKv2Xi4UeMfjnaZXfAgfPyad515lBIqoUWQIQOMkGqF7tMQeKAzUoTE37NPoyV/BZmr7OaCHQD/YHN7dyUCqMo@i0JTnynMF7RPQk4YIxCt1WcylmJCmnrN/@HIC3uDgVuaLcLgt9uylKF59Wd8ojN/RVqmMbDIezsLoajNmUuJod51weI7E99POmey0RM1Oa2Zt@Y42RfVwrHGX1X0hbEos6ev@rVbF5vkX). – no comment Sep 14 '21 at 11:21
17

Another way to create groups/chunks and not prewalk the generator is using itertools.groupby on a key function that uses an itertools.count object. Since the count object is independent of the iterable, the chunks can be easily generated without any knowledge of what the iterable holds.

Every iteration of groupby calls the next method of the count object and generates a group/chunk key (followed by items in the chunk) by doing an integer division of the current count value by the size of the chunk.

from itertools import groupby, count

def chunks(iterable, size=10):
    c = count()
    for _, g in groupby(iterable, lambda _: next(c)//size):
        yield g

Each group/chunk g yielded by the generator function is an iterator. However, since groupby uses a shared iterator for all groups, the group iterators cannot be stored in a list or any container, each group iterator should be consumed before the next.

Moses Koledoye
  • 77,341
  • 8
  • 133
  • 139
  • This works 7 to 8 times slower than itertools.chain method published by tobias_k above – Muposat Jan 25 '18 at 20:12
  • 1
    This method works. Tobias_k's method does not produce the correct result. Create a generator for integers 0 to 21 with a batch size of ten. Tobias_k's method as written does not produce three groups. This method does. I don't car that it's slow I care that it works. – VanBantam Sep 24 '21 at 17:53
  • Improvement: Replace `c = count()` and `next(c)//size` with `c = cycle((False,) * size + (True,) * size)` and `partial(next, c)` (changing import to get `cycle` instead of `count` and adding `from functools import partial`). Avoids per-element division and per-element creation of new `int` objects (at minor expense of a `tuple` of constants of size `2 * size` that `cycle` iterates over), saving a little over 35% on runtime. Using `map(operator.itemgetter(1), ...)` would remove the Python code per-element entirely, but testing on 3.9, the benefit there is less than 5%; not worth the trouble. – ShadowRanger Dec 02 '21 at 15:29
12

Best solution going forward (requires Python 3.12 or higher, expected to release in October 2023), requiring no hand-rolled code at all, and being fastest in all scenarios:

There's a built-in for this scheduled to release with Python 3.12, itertools.batched. The arguments are the reverse of the chunker recipes below, but it behaves the same way (batching into tuples of length n, with the final batch potentially being incomplete):

from itertools import batched

for batch in batched('ABCDEFG', 3):
    print(batch)

will output:

('A', 'B', 'C')
('D', 'E', 'F')
('G',)

I don't have timings for it yet, but given it's implemented at the C layer and the implementation takes advantage of performance optimizations not available at the Python layer, it should out-perform any solution implemented at the Python layer handily. In particular:

  • Unlike the tupled islices solution at the bottom of this answer, it always preallocates a tuple of size n up-front for each batch and populates it directly (where tupleing islices involves building the tuple up an element at a time, costing time on every batch; it uses amortized growth, but it can still involve a few reallocations per batch)
  • Unlike the zip_longest solution immediately below (which, thanks to zip_longest, does do presized tuples, and is likely the source of its better performance in most cases):
    1. (Every batch waste) It only has to look up the .__next__ of the object once per batch, not n times per batch (a small cost, given the C layer lookup, but it's paid for every batch), and
    2. (One-time waste tied to size of final incomplete batch) It doesn't need to create a sentinel for the fillvalue, nor locate it after the fact for the final batch (which requires at least O(log n) binary search, and pre-3.10, O(n) linear search, plus O(n) slicing work, for the final batch); batched is counting the elements as it pulls them as a side-effect of tracking the index to insert them into within the tuple, so when the iterator is exhausted, it stops immediately, and can directly realloc the tuple down to match the number of elements it was able to pull (avoiding a new tuple, and often avoiding any copying at all).

Fastest available solution prior to 3.12 for small-medium n and/or large numbers of batches (and for 3.10-3.11, it's not meaningfully slower even in pathological cases):

When the chunk size is typically small, the fastest solution is this one, adapted from rhettg's answer:

from itertools import takewhile, zip_longest

def chunker(n, iterable):
    '''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'),  ('G',)'''
    fillvalue = object()  # Anonymous sentinel object that can't possibly appear in input
    args = (iter(iterable),) * n
    for x in zip_longest(*args, fillvalue=fillvalue):
        if x[-1] is fillvalue:
            # takewhile optimizes a bit for when n is large and the final
            # group is small; at the cost of a little performance, you can
            # avoid the takewhile import and simplify to:
            # yield tuple(v for v in x if v is not fillvalue)
            yield tuple(takewhile(lambda v: v is not fillvalue, x))
        else:
            yield x

If performance is critical, especially when chunk sizes are large and you're doing this a lot, and you can rely on 3.10+ (which is when bisect added support for a key argument), you can improve the above a bit by replacing O(n) takewhile with O(log n) bisection, adding from bisect import bisect to the imports, and changing:

yield tuple(takewhile(lambda v: v is not fillvalue, x))

to:

yield x[:bisect(x, False, key=lambda v: v is fillvalue)]  # 3.10+ only!

Older answer (still very fast, thanks to pushing all the work to the C layer, but loses to zip_longest-based solution above by a little in all but pathological cases [involving small numbers of huge batch sizes], losing by a roughly factor of 2x in common cases):

By using purely C-level builtins (in CPython), no Python byte code is needed to produce each chunk (unless the underlying generator is implemented in Python) which has a huge performance benefit. It does walk each chunk before returning it, but it doesn't do any pre-walking beyond the chunk it's about to return:

# Only needed on Py2, to get iterator-based map; Py3's is already iterator-based
from future_builtins import map

from itertools import islice, repeat, starmap, takewhile

# operator.truth is *significantly* faster than bool for the case of
# exactly one positional argument prior to 3.10; in 3.10+, you can
# just use bool (which is trivially faster than truth)
from operator import truth

def chunker(n, iterable):  # n is size of each chunk; last chunk may be smaller
    return takewhile(truth, map(tuple, starmap(islice, repeat((iter(iterable), n)))))

Since that's a bit dense, the spread-out version for illustration:

def chunker(n, iterable):
    iterable = iter(iterable)
    while True:
        x = tuple(islice(iterable, n))
        if not x:
            return
        yield x

Wrapping a call to chunker in enumerate would let you number the chunks if it's needed.

ShadowRanger
  • 143,180
  • 12
  • 188
  • 271
  • Another ending, maybe good for large n: `yield x[:bisect(x, False, key=lambda v: v is fillvalue)]` – Kelly Bundy May 16 '23 at 14:31
  • Or at the expense of a few extra lines for a generator *function* that breaks early (instead of the generator *expression* that doesn't): `yield tuple(unfill())` – Kelly Bundy May 16 '23 at 14:47
  • Btw how did you measure how fast they are? Do you have a benchmark script you could share? – Kelly Bundy May 16 '23 at 14:47
  • @KellyBundy: Yeah, `bisect` is an option, but in my experience it rarely help much unless `n` is *huge*, and over-optimizing the terminal case doesn't seem worth it, especially when the test is so cheap, the likely batch sizes smaller, and the terminal case unlikely to dominate. Back when I first wrote this, `bisect` didn't support `key` arguments (that wasn't added until 3.10, which released only a couple months before the 20212 updated version of the answer). Frankly, I can't really justify anything beyond `yield tuple(v for v in x if v is not fillvalue)` unless you know perf depends on it. – ShadowRanger May 16 '23 at 17:43
  • @KellyBundy: I don't remember how I tested back then (I think it was `timeit` with the dataset from [rhettg's post](https://stackoverflow.com/a/10791887/364696)), but I just reconfirmed the timings on 3.11.1 with IPython `timeit` magic: `%%timeit from collections import deque; consume = deque(maxlen=0).extend; tochunk = (0,)*1000` as the setup line, with the test being `consume(chunker(3, tochunk))` (where the `3` is to test small chunk sizes, replacing with `31` for medium chunk sizes, or `334` to test pathological cases with huge chunks where the last chunk is mostly filled, but not quite). – ShadowRanger May 16 '23 at 17:50
  • On 3.11.1: For the pathologically long case, the one-liner at the C layer wins ( ~8 µs vs. the more verbose `zip_longest` at ~21 µs). Your proposed `bisect` optimization of the `zip_longest` still loses to the one-liner, in that case, but by less (takes ~10 µs). For every other scenario, the `zip_longest` solution wins handily (with `bisect` typically saving a titch under ~1 µs over `takewhile`). The `bisect` wins pretty reliably, so I'll edit it in (with a 3.10+-only caveat). – ShadowRanger May 16 '23 at 17:56
  • I posted a few somewhat [faster variations](https://stackoverflow.com/a/76282575/1672429) of the `zip_longest` solution. – Stefan Pochmann May 18 '23 at 16:10
6

more-itertools has provided chunked and ichunked that can achieve the goal, it is mentioned on the Python 3 itertools document page.

chunked and ichunked example

duyue
  • 759
  • 5
  • 12
3

How about using itertools.islice:

import itertools

els = iter(xrange(7))

print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))

Which gives:

[0, 1]
[2, 3]
[4, 5]
[6]

Chunker implementation with some tests:

import itertools
from typing import Iterable


def chunker(iterable: Iterable, size: int) -> Iterable[list]:
    iterable = iter(iterable)
    while True:
        chunk = list(itertools.islice(iterable, size))
        if chunk:
            yield chunk
        else:
            break



assert list(chunker(range(10), 3)) == [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
assert list(chunker([], 3)) == []
warvariuc
  • 57,116
  • 41
  • 173
  • 227
3

Started to realize the usefulness of this scenario when crafting a solution to DB insertion of 500k+ rows at higher speed.

A generator processes the data from source and "yield"s it line by line; and then another generator groups the output in chunks and "yield "s it chunk by chunk. The second generator is only aware of the chunk size and nothing more.

Below is a sample to highlight the concept:

#!/usr/bin/python

def firstn_gen(n):
    num = 0
    while num < n:
        yield num
        num += 1

def chunk_gen(some_gen, chunk_size=7):
    res_chunk = []
    for count, item in enumerate(some_gen, 1):
        res_chunk.append(item)
        if count % chunk_size == 0:
            yield res_chunk
            res_chunk[:] = []
    else:
        yield res_chunk


if __name__ == '__main__':
    for a_chunk in chunk_gen(firstn_gen(33)):
        print(a_chunk)

Tested in Python 2.7.12:

[0, 1, 2, 3, 4, 5, 6]
[7, 8, 9, 10, 11, 12, 13]
[14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27]
[28, 29, 30, 31, 32]
Down the Stream
  • 639
  • 7
  • 10
2

I had this same issue, but found a simpler solution than those mentioned here:

def chunker(iterable, chunk_size):
    els = iter(iterable)
    while True:
        next_el = next(els)
        yield chain([next_el], islice(els, chunk_size - 1))

for i, chunk in enumerate(chunker(range(11), 2)):
    for el in chunk:
        print(i, el)

# Prints the following:
0 0
0 1
1 2
1 3
2 4
2 5
3 6
3 7
4 8
4 9
5 10
santon
  • 4,395
  • 1
  • 24
  • 43
2
from itertools import islice
def chunk(it, n):
    '''
    # returns chunks of n elements each

    >>> list(chunk(range(10), 3))
    [
        [0, 1, 2, ],
        [3, 4, 5, ],
        [6, 7, 8, ],
        [9, ]
    ]

    >>> list(chunk(list(range(10)), 3))
    [
        [0, 1, 2, ],
        [3, 4, 5, ],
        [6, 7, 8, ],
        [9, ]
    ]
    '''
    def _w(g):
        return lambda: tuple(islice(g, n))
    return iter(_w(iter(it)), ())
igiroux
  • 84
  • 3
1

Inspired by Moses Koledoye's answer, I tried to make a solution that uses itertools.groupby but doesn't require a divide at each step.

The following function can be used as a key to groupby, and it simply returns a boolean, which flips after a pre-defined number of calls.

def chunks(chunksize=3):

    def flag_gen():
        flag = False
        while True:
            for num in range(chunksize):
                yield flag
            flag = not flag

    flag_iter = flag_gen()

    def flag_func(*args, **kwargs):
        return next(flag_iter)

    return flag_func

Which can be used like this:

from itertools import groupby

my_long_generator = iter("abcdefghijklmnopqrstuvwxyz")

chunked_generator = groupby(my_long_generator, key=chunks(chunksize=5))

for flag, chunk in chunked_generator:
    print("Flag is {f}".format(f=flag), list(chunk))

Output:

Flag is False ['a', 'b', 'c', 'd', 'e']
Flag is True ['f', 'g', 'h', 'i', 'j']
Flag is False ['k', 'l', 'm', 'n', 'o']
Flag is True ['p', 'q', 'r', 's', 't']
Flag is False ['u', 'v', 'w', 'x', 'y']
Flag is True ['z']

I've made a fiddle demonstrating this code.

vowel-house-might
  • 1,686
  • 14
  • 18
0

You've said you don't wish to store things in memory, so does this mean that you can't build an intermediate list for the current chunk?

Why not traverse the generator and insert a sentinel value between chunks? The consumer (or a suitable wrapper) could ignore the sentinel:

class Sentinel(object):
    pass

def chunk(els, size):
    for i, el in enumerate(els):
        yield el
        if i > 0 and i % size == 0:
            yield Sentinel
  • I do not want to traverse the generator. I want to produce a list of generators which **will** walk the original generator *only when* the consumers decide. This way I can pass the list of chunks around for other parts of my code to consume it, without pre-walking the data (which is expensive to compute in my case) – blueFast Jul 02 '14 at 09:51
0

EDIT other solution with a generator of generators

You should not do a while True in your iterator, but simply iterate through it and update the chunk number at each iteration :

def chunk(it, maxv):
    n = 0
    for i in it:
        yield n // mavx, i
        n += 1

If you want a generator of generators, you can have :

def chunk(a, maxv):
    def inner(it, maxv, l):
        l[0] = False
        for i in range(maxv):
            yield next(it)
        l[0] = True
        raise StopIteration
    it = iter(a)
    l = [True]
    while l[0] == True:
        yield inner(it, maxv, l)
    raise StopIteration

with a being an iterable.

Tests : on python 2.7 and 3.4:

for i in chunk(range(7), 3):
    print 'CHUNK'
    for a in i:
        print a

gives :

CHUNK
0
1
2
CHUNK
3
4
5
CHUNK
6

And on 2.7 :

for i in chunk(xrange(7), 3):
    print 'CHUNK'
    for a in i:
        print a

gives same result.

But BEWARE : list(chunk(range(7)) blocks on 2.7 and 3.4

Serge Ballesta
  • 143,923
  • 11
  • 122
  • 252
  • This does not provide me with a series of chunks, but with a single iterator with an indication of the chunk it belongs too. It pre-walks the iterator, which is exactly what I want to avoid. – blueFast Jul 02 '14 at 09:49
0

Three variations of ShadowRanger's fastest pre-Python-3.12 solution using zip_longest to get chunk tuples and removing the fillvalue from the last chunk. They're the Stefan_* ones in this benchmark for an iterable with 10^6 elements and chunk size n=2 to n=1000:

benchmark plot

Tested on Python 3.10.8. No idea what's up with Stefan_stopped from n=7 to n=10, I ran the benchmark multiple times and it's always like that.

Stefan_hold

This one holds one chunk, and for each next chunk, yields the held chunk and then holds the next one instead. At the end, unfill and yield the final held chunk.

def chunker(n, iterable):
    fillvalue = object()
    args = repeat(iter(iterable), n)
    chunks = zip_longest(*args, fillvalue=fillvalue)
    for chunk in chunks:
        for next_chunk in chunks:
            yield chunk
            chunk = next_chunk
        yield unfill(chunk, fillvalue)

Stefan_stopped

ShadowRanger checks if x[-1] is fillvalue: for each chunk x. In this solution, I instead use the faster if stopped: check of a bool value. To make that work, I chain an "empty" iterator to the input whose sole job is to set stopped to True:

def chunker(n, iterable):
    stopped = False
    def stop():
        nonlocal stopped
        stopped = True
        return
        yield
    fillvalue = object()
    it = iter(iterable)
    most = repeat(it, n-1)
    last = chain(it, stop())
    for chunk in zip_longest(*most, last, fillvalue=fillvalue):
        if stopped:
            yield unfill(chunk, fillvalue)
        else:
            yield chunk
            del chunk

Stefan_compress

This one eliminates Python interpretation during all but the last chunk. It uses tee to get three copies of the chunks iterator from zip_longest. The main copy contributes all but the last chunk. The fast copy is one chunk ahead, its sole job is to avoid the last chunk from main. The slow copy is one step behind fast and provides the last chunk.

def chunker(n, iterable):
    fillvalue = object()
    args = (iter(iterable),) * n
    chunks = zip_longest(*args, fillvalue=fillvalue)
    main, fast, slow = tee(chunks, 3)
    next(fast, None)
    return chain(
        compress(main, zip(fast, slow)),
        (unfill(chunk, fillvalue) for chunk in slow)
    )

unfill

The helper my solutions use to remove the fillvalue from the last chunk:

def unfill(chunk, fillvalue):
    return chunk[:bisect(chunk, False, key=lambda value: value is fillvalue)]

The del optimization

ShadowRanger_del is a minor modification of ShadowRanger's solution, deleting the chunk variable after yielding the chunk, so that zip_longest can use its optimization of re-using its result tuple. My Stefan_stopped also uses this optimization.

Though that only helps/works if the consumer of our chunks iterator also doesn't keep a reference to the tuple, for example if it uses map(sum, chunker(...)) to compute sums of chunks of numbers.

Here's the same benchmark but without that del optimization:

benchmark plot without del optimization

Full code

Solutions, correctness checking, benchmarking, plotting.

import sys
print(sys.version)
import matplotlib.pyplot as plt
from itertools import takewhile, zip_longest, chain, compress, tee, repeat
from timeit import timeit
from statistics import mean, stdev
from collections import deque
import gc
from random import sample
from bisect import bisect


#----------------------------------------------------------------------
# Solutions
#----------------------------------------------------------------------

def ShadowRanger(n, iterable):
    '''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'),  ('G',)'''
    fillvalue = object()  # Anonymous sentinel object that can't possibly appear in input
    args = (iter(iterable),) * n
    for x in zip_longest(*args, fillvalue=fillvalue):
        if x[-1] is fillvalue:
            # takewhile optimizes a bit for when n is large and the final
            # group is small; at the cost of a little performance, you can
            # avoid the takewhile import and simplify to:
            # yield tuple(v for v in x if v is not fillvalue)
            yield tuple(takewhile(lambda v: v is not fillvalue, x))
        else:
            yield x


def ShadowRanger_del(n, iterable):
    '''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'),  ('G',)'''
    fillvalue = object()  # Anonymous sentinel object that can't possibly appear in input
    args = (iter(iterable),) * n
    for x in zip_longest(*args, fillvalue=fillvalue):
        if x[-1] is fillvalue:
            # takewhile optimizes a bit for when n is large and the final
            # group is small; at the cost of a little performance, you can
            # avoid the takewhile import and simplify to:
            # yield tuple(v for v in x if v is not fillvalue)
            yield tuple(takewhile(lambda v: v is not fillvalue, x))
        else:
            yield x
            del x


def unfill(chunk, fillvalue):
    return chunk[:bisect(chunk, False, key=lambda value: value is fillvalue)]


def Stefan_stopped(n, iterable):
    stopped = False
    def stop():
        nonlocal stopped
        stopped = True
        return
        yield
    fillvalue = object()
    it = iter(iterable)
    most = repeat(it, n-1)
    last = chain(it, stop())
    for chunk in zip_longest(*most, last, fillvalue=fillvalue):
        if stopped:
            yield unfill(chunk, fillvalue)
        else:
            yield chunk
            del chunk


def Stefan_compress(n, iterable):
    fillvalue = object()
    args = (iter(iterable),) * n
    chunks = zip_longest(*args, fillvalue=fillvalue)
    main, fast, slow = tee(chunks, 3)
    next(fast, None)
    return chain(
        compress(main, zip(fast, slow)),
        (unfill(chunk, fillvalue) for chunk in slow)
    )


def Stefan_hold(n, iterable):
    fillvalue = object()
    args = repeat(iter(iterable), n)
    chunks = zip_longest(*args, fillvalue=fillvalue)
    for chunk in chunks:
        for next_chunk in chunks:
            yield chunk
            chunk = next_chunk
        yield unfill(chunk, fillvalue)


funcs = ShadowRanger, ShadowRanger_del, Stefan_stopped, Stefan_compress, Stefan_hold


#----------------------------------------------------------------------
# Correctness checks
#----------------------------------------------------------------------

def run(f):
    return list(f(n, iter(range(N))))
for n in range(1, 10):
    for N in range(100):
        args = n, range(N)
        expect = run(ShadowRanger)
        for f in funcs:
            result = run(f)
            if result != expect:
                raise Exception(f'{f.__name__} failed for {n=}, {N=}')


#----------------------------------------------------------------------
# Benchmarking
#----------------------------------------------------------------------

benchmarks = []

# Speed
N = 10 ** 6
for n in *range(2, 11), 20, 50, 100, 200, 500, 1000:
  for _ in range(1):
    times = {f: [] for f in funcs}
    def stats(f):
        ts = [t * 1e3 for t in sorted(times[f])[:10]]
        return f'{mean(ts):6.2f} ± {stdev(ts):4.2f} ms '

    for _ in range(100):
        for f in sample(funcs, len(funcs)):
            gc.collect()
            t = timeit(lambda: deque(f(n, repeat(None, N)), 0), number=1)
            times[f].append(t)

    print(f'\n{n = }')
    for f in sorted(funcs, key=stats):
        print(stats(f), f.__name__)

    benchmarks.append((n, times))


#----------------------------------------------------------------------
# Plotting
#----------------------------------------------------------------------

names = [f.__name__ for f in funcs]
stats = [
    (n, [mean(sorted(times[f])[:10]) * 1e3 for f in funcs])
    for n, times in benchmarks
]

colors = {
    'Stefan_stopped': 'blue',
    'Stefan_compress': 'lime',
    'Stefan_hold': 'gold',
    'ShadowRanger': 'red',
    'ShadowRanger_del': 'pink',
}
ns = [n for n, _ in stats]
print('%28s' % 'chunk size', *('%5d' % n for n in ns))
print('-' * 95)
x = range(len(ns))
for i, name in enumerate(names):
    ts = [tss[i] for _, tss in stats]
    ts = [tss[i] / tss[0] * 100 for _, tss in stats]
    color = colors[name]
    if color:
        plt.plot(x, ts, '.-', color=color, label=name)
        print('%29s' % name, *('%5.1f' % t for t in ts))
plt.xticks(x, ns, size=9)
plt.ylim(0, 120)
plt.title('chunker(n, $10^6$ elements)', weight='bold')
plt.xlabel('Chunk size n', weight='bold')
plt.ylabel("Time (for complete iteration) in % relative to ShadowRanger's", weight='bold')
plt.legend(loc='lower center')
#plt.show()
plt.savefig('chunker_plot.png', dpi=200)
Stefan Pochmann
  • 27,593
  • 8
  • 44
  • 107