TL;DR
This is a significant improvement over the original approach as long as the block size N
(below referred to as m
or size
) is large enough, and the input has a significant number of non-zero values:
import numpy as np
import numba as nb
@nb.njit
def find_blocks_while2_nb(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
i = j = 0
while i < n - size + 1:
# last iteration found a block, only checking one value
if j < i and arr[i + size - 1] == value:
result[i] = True
i += 1
else:
# search backward for index of last non-block value
j = i + size - 1
while j >= i and arr[j] == value:
j -= 1
if j < i:
# block found, advance by 1
result[i] = True
i += 1
else:
# block not found, advance by last non-block value occurrence
j += 1
i = j
return result
Essentially it is a Numba accelerated version of your original approach with a number of micro-optimizations:
- The number of times the main loop is executed is reduced by setting proper limit to the main counter
- The part that checks if the block contains all the same values:
- avoids a relatively expensive array slicing
- returns the position where it failed
- runs backward
- avoids re-checking consecutive blocks
For small block sizes, more micro-optimized approach may be faster as introduced in other answers, and briefly discussed below.
Introduction
The problem of finding the indices where a block of identical values starts can be seen as a specialization of a classical problem in computer science: finding a sub-array within an array (discussed in some detail here).
In this case, the sub-string (i.e. the block) consists of a series of identical values, more precisely 0
s.
Theory
The complexity of this problem depends, in general, on the size of the array (n
) and the size of the block (m
or size
), as well as the actual values within them (obviously).
Of course, it is assumed that n > m
, as it is clear that if n == m
the solution is trivial, and when n < m
the array cannot fit the block.
For the same reason, no index above n - m
can be found.
While a naïve implementation would require essentially two nested loops, one for string and one for sub-string (which would result is O(nm)
computational complexity), it is possible to make use of the fact that the block consists of identical values to run the algorithm in guaranteed linear time O(n)
and with constant memory requirement O(1)
.
However, under the assumption that the input array has a mixed distribution of both block and non-block values, it is possible to run in approx. O(n / (m - ε))
time and still with O(1)
memory consumption. The ε < m - 1
is a parameter proportional to the probability of finding a non-block value.
The idea is that when trying to determine if a block starts at position i
if the array has a non-block value at position j
with j - i < m
then the next position to evaluate is j + 1
, since, the block cannot fit between i
and j
.
Moreover, if the search starts from the back (i.e. at position i - m
), then the number of position that are not checked is maximal for sufficiently heterogeneous inputs, as all j - i
values do not need to be checked.
For similar reasons, if it is known that a block starts at position i
, to check whether a block starts at position i + j
with j < m
, only the values between i + m
and i + m + j
needs to checked, and the values between i
and i + m - 1
do not need to checked again.
Implementations
This a manual case where a generator is a perfect fit, because it is possible to leverage its lazy evaluation properties to perform the bit of computation only as needed.
Additionally, it is not known in advance how large the result will be (i.e. it is possible to find many blocks or no blocks at all).
However, if this is part of a larger computation it may be faster to pre-allocate the size of the result and just modify the value its indices.
There should not be a need to allocate more than n - m + 1
values.
However, for compatibility with the requirements of the question, an array the size of the input is pre-allocated.
If needed, it is easy to adapt the following implementations to avoid wasting this extra memory.
More precisely, the solutions based on explicit looping can be converted to generators by replacing result[i] = True
with yield i
(or similar).
Likewise, solutions that compute the boolean array natively can be converted to generators with something like:
for i in np.nonzero(result)[0]:
yield i
The remainder of the analysis will assume that the target result is the boolean array.
One of the challenges of implementing this in pure Python is that explicit looping is relatively slow.
For this reason, it is convenient to use some acceleration technique (e.g. Numba or Cython) to reduce computation time.
Alternatively, given that the algorithm consists essentially of two loops (one for the array and one for the sub-array) one may seek to vectorize one or both loops.
However, do note that these approaches would necessarily run in O(nm)
worst-case time.
The basic naïve approach is the following:
def find_blocks_for2(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
for i in range(n - size + 1):
all_equal = True
for j in range(i, i + size):
if arr[j] != value:
all_equal = False
break # vectorized approaches may not have this
if all_equal:
result[i] = True
return result
This can be improved in a number of directions.
One way stems from the fact that the inner loop is actually useless, because it is sufficient to keep track of the number of consecutive "good" values present (this is essentially the same as MichaelSzczesny's answer):
import numpy as np
def find_blocks_for(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
block_size = 0
for i in range(n):
if arr[i] == value:
block_size += 1
else:
block_size = 0
if block_size >= size:
result[i - size + 1] = True
return result
Note that this can be seen as a specialization of the Rabin-Karp algorithm where the role of the hashing is played by the block_size
counter.
Another possible approach is to skip the second loop only when the last iteration already identified the presence of a block and have it running backward to maximize the jump to the next possible block:
import numpy as np
def find_blocks_while2(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
i = j = 0
while i < n - size + 1:
# last iteration found a block, only checking one value
if j < i and arr[i + size - 1] == value:
result[i] = True
i += 1
else:
# search backward for index of last non-block value
j = i + size - 1
while j >= i and arr[j] == value:
j -= 1
if j < i:
# block found, advance by 1
result[i] = True
i += 1
else:
# block not found, advance by last non-block value occurrence
j += 1
i = j
return result
Note that this can be seen as a specialization of the Knuth-Morris-Pratt algorithm without the need of computing offsets for each value of the block (they are all the same).
A number of variations can be devised for the above approaches, all with the same asymptotic behavior but slightly different running time depending on the actual input.
For example, one could change the order of the conditions to speed-up the case where many consecutive blocks are found, at the expenses of running more instructions elsewhere, or the other way around.
All these run just too slow without acceleration either with Numba, Cython or Pypy.
Pypy uses a different interpreter and I will not discuss/benchmark it further.
Numba provides one of the most straightforward and effective acceleration (just apply a decorator), e.g.:
import numba as nb
find_blocks_while2_nb = nb.njit(find_blocks_while2)
find_blocks_while2_nb.__name__ = "find_blocks_while2_nb"
Cython can also be used, e.g. (using %load_ext Cython
Jupyter magic), but to take full advantage of the compilation it would require extra care to make sure bottleneck code runs without Python interaction:
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True
import cython as cy
import numpy as np
cdef _find_blocks_lin_cy(long[::1] arr, char[::1] result, Py_ssize_t n, Py_ssize_t size, long value):
cdef Py_ssize_t block_size = 0
for i in range(n):
if arr[i] == value:
block_size += 1
else:
block_size = 0
if block_size >= size:
result[i - size + 1] = True
def find_blocks_lin_cy(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
_find_blocks_lin_cy(arr, result, n, size, value)
return result
Whether it is going to be faster Numba or Cython depends on how much optimization can be triggered by the code, and as such depends also on which combination of compiler and CPU is used.
Alternatively, either the inner or the outer loop (or both) can be vectorized.
Of course this is most effective on whether it is run more the inner or the outer loop.
The largest loop should be run vectorized for maximum speed-up.
Vectorizing the inner loop would lead to something close to this (similar to the original post, but with smaller boundaries and sum()
replaced with np.all()
which also sports short-circuiting):
def find_blocks_for_all(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
for i in range(n - size + 1):
if np.all(arr[i:i + size] == value):
result[i] = True
return result
Vectorizing the outer loop would lead to something close to this (similar to what is presented in DanielF's answer, but with overall cleaner code, especially avoiding unnecessary function calls):
def find_blocks_for_and(arr, size, value):
result = (arr == value)
for i in range(1, size):
result[:-1] &= result[1:]
result[1 - size:] = False
return result
Vectorizing both loops would lead to something close to this (similar to one approach from NaphatAmundsen's answer):
def find_blocks_strides(arr, size, value):
n = arr.size
strides = (arr.itemsize,) * 2
block = np.full(size, value, dtype=arr.dtype)
# note that `as_strided()` does not allocate extra memory
# the downside is that buffer overflow will occur and
# extra care must be taken not to use data outside the boundaries
check = np.lib.stride_tricks.as_strided(arr, (n, size), strides)
result = np.all(check == block, axis=-1)
result[1 - size:] = False
return result
The above code can be further specialized under the assumption that the input will contain only positive values and that block consists of zeros only.
This would mean replacing np.all()
calls with np.sum()
or similar.
One approach where this is actually beneficial is with the fully vectorized approach, where as_strided()
can be replaced by computing the correlation with a non-zero block (similar to one approach from NaphatAmundsen's answer):
def find_blocks_0_conv(arr, size):
n = arr.size
result = np.zeros_like(arr, dtype=np.bool_)
block = np.ones(size, dtype=arr.dtype)
result[:1 - size] = (np.correlate(arr, block, 'valid') == 0)
return result
Aside of the looping, the running time will also depend on how the code translates to hardware instructions and ultimately how fast are them.
For example, if NumPy is compiled with full support of SIMD instructions, it may very well be that a vectorized (but not theoretically optimal) approach would outperform, for many input combinations, theoretically optimal approaches that are naïvely accelerated.
The same could happen for approaches that are written in a way that acceleration techniques can make very good use of SIMD instructions.
However, the extra speed they provide will be limited to specific systems (i.e. specific combinations of compilers and processors).
One such example is the following (essentially a polishing of Jérôme Richard's answer to avoid global variables and removing unnecessary looping):
def find_blocks_simd_nb_cached(arr, size, value=0, cache={}):
assert size > 0
if size not in cache:
print('cached: ', list(cache.keys()))
def find_blocks_simd_nb(arr, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
check = (arr == value)
for i in range(n - size + 1):
# using a `temp` variable avoid unnecessarily accessing the `check` array
temp = check[i]
for j in range(1, size):
temp &= check[i + j]
result[i] = temp
return result
cache[size] = nb.njit(find_blocks_simd_nb)
return cache[size](arr, value)
Benchmarking
Here, some benchmarks are reported.
As always, they should be taken with a grain of salt.
Only the fastest approaches are considered, for varying input sizes, block sizes and values.
The _nb
and _cy
(except for find_blocks_for_cy()
are essentially non-optimized versions of the same functions without the _nb
or _cy
suffix from above.
Varying input size, block size = 4
Uniform distribution of 1s and 0s

All 0s

All 1s

Varying block size, input size = 600_000
Uniform distribution of 1s and 0s

All 0s

All 1s

Note that:
find_blocks_for_all()
is actually fast for sufficiently large values of size
find_blocks_for_and()
is very fast for smaller values of size
and does not require Numba or Cython
find_blocks_for_nb()
is essentially independent on the block size and performs overall really well
find_blocks_simd_nb()
is very fast for small values of size
but performs comparatively poorly for larger values
find_blocks_while2_nb()
really shines for larger values of size
and it is respectably fast for smaller values
(Full benchmarks available here).