Three variations of ShadowRanger's fastest pre-Python-3.12 solution using zip_longest
to get chunk tuples and removing the fillvalue from the last chunk. They're the Stefan_*
ones in this benchmark for an iterable with 10^6 elements and chunk size n=2 to n=1000:

Tested on Python 3.10.8. No idea what's up with Stefan_stopped
from n=7 to n=10, I ran the benchmark multiple times and it's always like that.
Stefan_hold
This one holds one chunk, and for each next chunk, yields the held chunk and then holds the next one instead. At the end, unfill and yield the final held chunk.
def chunker(n, iterable):
fillvalue = object()
args = repeat(iter(iterable), n)
chunks = zip_longest(*args, fillvalue=fillvalue)
for chunk in chunks:
for next_chunk in chunks:
yield chunk
chunk = next_chunk
yield unfill(chunk, fillvalue)
Stefan_stopped
ShadowRanger checks if x[-1] is fillvalue:
for each chunk x
. In this solution, I instead use the faster if stopped:
check of a bool value. To make that work, I chain an "empty" iterator to the input whose sole job is to set stopped
to True
:
def chunker(n, iterable):
stopped = False
def stop():
nonlocal stopped
stopped = True
return
yield
fillvalue = object()
it = iter(iterable)
most = repeat(it, n-1)
last = chain(it, stop())
for chunk in zip_longest(*most, last, fillvalue=fillvalue):
if stopped:
yield unfill(chunk, fillvalue)
else:
yield chunk
del chunk
Stefan_compress
This one eliminates Python interpretation during all but the last chunk. It uses tee
to get three copies of the chunks iterator from zip_longest
. The main
copy contributes all but the last chunk. The fast
copy is one chunk ahead, its sole job is to avoid the last chunk from main
. The slow
copy is one step behind fast
and provides the last chunk.
def chunker(n, iterable):
fillvalue = object()
args = (iter(iterable),) * n
chunks = zip_longest(*args, fillvalue=fillvalue)
main, fast, slow = tee(chunks, 3)
next(fast, None)
return chain(
compress(main, zip(fast, slow)),
(unfill(chunk, fillvalue) for chunk in slow)
)
unfill
The helper my solutions use to remove the fillvalue from the last chunk:
def unfill(chunk, fillvalue):
return chunk[:bisect(chunk, False, key=lambda value: value is fillvalue)]
The del
optimization
ShadowRanger_del
is a minor modification of ShadowRanger's solution, deleting the chunk variable after yielding the chunk, so that zip_longest
can use its optimization of re-using its result tuple. My Stefan_stopped
also uses this optimization.
Though that only helps/works if the consumer of our chunks iterator also doesn't keep a reference to the tuple, for example if it uses map(sum, chunker(...))
to compute sums of chunks of numbers.
Here's the same benchmark but without that del
optimization:

Full code
Solutions, correctness checking, benchmarking, plotting.
import sys
print(sys.version)
import matplotlib.pyplot as plt
from itertools import takewhile, zip_longest, chain, compress, tee, repeat
from timeit import timeit
from statistics import mean, stdev
from collections import deque
import gc
from random import sample
from bisect import bisect
#----------------------------------------------------------------------
# Solutions
#----------------------------------------------------------------------
def ShadowRanger(n, iterable):
'''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'), ('G',)'''
fillvalue = object() # Anonymous sentinel object that can't possibly appear in input
args = (iter(iterable),) * n
for x in zip_longest(*args, fillvalue=fillvalue):
if x[-1] is fillvalue:
# takewhile optimizes a bit for when n is large and the final
# group is small; at the cost of a little performance, you can
# avoid the takewhile import and simplify to:
# yield tuple(v for v in x if v is not fillvalue)
yield tuple(takewhile(lambda v: v is not fillvalue, x))
else:
yield x
def ShadowRanger_del(n, iterable):
'''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'), ('G',)'''
fillvalue = object() # Anonymous sentinel object that can't possibly appear in input
args = (iter(iterable),) * n
for x in zip_longest(*args, fillvalue=fillvalue):
if x[-1] is fillvalue:
# takewhile optimizes a bit for when n is large and the final
# group is small; at the cost of a little performance, you can
# avoid the takewhile import and simplify to:
# yield tuple(v for v in x if v is not fillvalue)
yield tuple(takewhile(lambda v: v is not fillvalue, x))
else:
yield x
del x
def unfill(chunk, fillvalue):
return chunk[:bisect(chunk, False, key=lambda value: value is fillvalue)]
def Stefan_stopped(n, iterable):
stopped = False
def stop():
nonlocal stopped
stopped = True
return
yield
fillvalue = object()
it = iter(iterable)
most = repeat(it, n-1)
last = chain(it, stop())
for chunk in zip_longest(*most, last, fillvalue=fillvalue):
if stopped:
yield unfill(chunk, fillvalue)
else:
yield chunk
del chunk
def Stefan_compress(n, iterable):
fillvalue = object()
args = (iter(iterable),) * n
chunks = zip_longest(*args, fillvalue=fillvalue)
main, fast, slow = tee(chunks, 3)
next(fast, None)
return chain(
compress(main, zip(fast, slow)),
(unfill(chunk, fillvalue) for chunk in slow)
)
def Stefan_hold(n, iterable):
fillvalue = object()
args = repeat(iter(iterable), n)
chunks = zip_longest(*args, fillvalue=fillvalue)
for chunk in chunks:
for next_chunk in chunks:
yield chunk
chunk = next_chunk
yield unfill(chunk, fillvalue)
funcs = ShadowRanger, ShadowRanger_del, Stefan_stopped, Stefan_compress, Stefan_hold
#----------------------------------------------------------------------
# Correctness checks
#----------------------------------------------------------------------
def run(f):
return list(f(n, iter(range(N))))
for n in range(1, 10):
for N in range(100):
args = n, range(N)
expect = run(ShadowRanger)
for f in funcs:
result = run(f)
if result != expect:
raise Exception(f'{f.__name__} failed for {n=}, {N=}')
#----------------------------------------------------------------------
# Benchmarking
#----------------------------------------------------------------------
benchmarks = []
# Speed
N = 10 ** 6
for n in *range(2, 11), 20, 50, 100, 200, 500, 1000:
for _ in range(1):
times = {f: [] for f in funcs}
def stats(f):
ts = [t * 1e3 for t in sorted(times[f])[:10]]
return f'{mean(ts):6.2f} ± {stdev(ts):4.2f} ms '
for _ in range(100):
for f in sample(funcs, len(funcs)):
gc.collect()
t = timeit(lambda: deque(f(n, repeat(None, N)), 0), number=1)
times[f].append(t)
print(f'\n{n = }')
for f in sorted(funcs, key=stats):
print(stats(f), f.__name__)
benchmarks.append((n, times))
#----------------------------------------------------------------------
# Plotting
#----------------------------------------------------------------------
names = [f.__name__ for f in funcs]
stats = [
(n, [mean(sorted(times[f])[:10]) * 1e3 for f in funcs])
for n, times in benchmarks
]
colors = {
'Stefan_stopped': 'blue',
'Stefan_compress': 'lime',
'Stefan_hold': 'gold',
'ShadowRanger': 'red',
'ShadowRanger_del': 'pink',
}
ns = [n for n, _ in stats]
print('%28s' % 'chunk size', *('%5d' % n for n in ns))
print('-' * 95)
x = range(len(ns))
for i, name in enumerate(names):
ts = [tss[i] for _, tss in stats]
ts = [tss[i] / tss[0] * 100 for _, tss in stats]
color = colors[name]
if color:
plt.plot(x, ts, '.-', color=color, label=name)
print('%29s' % name, *('%5.1f' % t for t in ts))
plt.xticks(x, ns, size=9)
plt.ylim(0, 120)
plt.title('chunker(n, $10^6$ elements)', weight='bold')
plt.xlabel('Chunk size n', weight='bold')
plt.ylabel("Time (for complete iteration) in % relative to ShadowRanger's", weight='bold')
plt.legend(loc='lower center')
#plt.show()
plt.savefig('chunker_plot.png', dpi=200)