For small M's
, we can assign each of the xs
a unique label based on the combination of integers in it. For the same we can leverage convolution with an scaling array and hence, reduce each of xs
to a scalar. Finally, we use matching-method to detect and hence, the count.
The only over-head would be the conversion to array from the list of arrays. So, if the list-creation itself is optimized before-hand to have an array rather, it would help a lot on final performance numbers.
The implementation would look something like this -
x = np.asarray(xs) # convert to array, if not already done
s = M**np.arange(x.shape[1])
yr = np.convolve(y,s[::-1])
xr = x.dot(s)
# Final step : Match and get count
N = np.maximum(xr.max(),yr.max())+1 # or use s[-1]*M if M is small enough
l = np.zeros(N, dtype=bool)
l[yr] = True
count = l[xr].sum()
Alternatives to perform Final step
Alternative #1 :
sidx = yr.argsort()
idx = np.searchsorted(yr,xr,sorter=sidx)
idx[idx==len(yr)] = 0
count = (yr[sidx[idx]] == xr).sum()
Alternative #2 :
from scipy.sparse import csr_matrix
ly = len(yr)
y_extent = yr.max()+1 # or use s[-1]*M if M is small enough
r = np.zeros(ly, dtype=np.uint64)
val = np.ones(ly, dtype=np.bool)
sp_mat = csr_matrix((val, (r,yr)), shape=(1,y_extent))
count = sp_mat[:,xr].sum()
Alternative #3 :
For bigger M
number, we can use empty
arrays instead -
l = np.empty(N, dtype=bool)
l[xr] = False
l[yr] = True
count = l[xr].sum()
Digging further (Leveraging numba
on convolution
)
Profiling on the main proposed solution reveals that the 1D
convolution part was the time-consuming part. Going further, we see that the 1D
convolution code has a specific kernel that is geometric in nature. This could be implemented in O(n)
upon re-using boundary elements per iteration. Note that this would be basically an inverted kernel as compared to the one proposed earlier. So, putting all of those changes, we end up with something like this -
from numba import njit
@njit
def numba1(y, conv_out, M, L, N):
A = M**L
for i in range(1,N):
conv_out[i] = conv_out[i-1]*M + y[i+L-1] - y[i-1]*A
return conv_out
def numba_convolve(y, M, L):
N = len(y)-L+1
conv_out = np.empty(N, dtype=int)
conv_out[0] = y[:L].dot(M**np.arange(L-1,-1,-1))
return numba1(y, conv_out, M, L, N)
def intersection_count(xs, y):
x = np.asarray(xs) # convert to array, if not already done
L = x.shape[1]
s = M**np.arange(L-1,-1,-1)
xr = x.dot(s)
yr_numba = numba_convolve(y, M=M, L=L)
# Final step : Match and get count
N = s[0]*M
l = np.zeros(N, dtype=bool)
l[yr_numba] = True
count = l[xr].sum()
return count
Benchmarking
We will re-use the setup from the question.
In [42]: %%timeit
...: y_bytes = y.tobytes()
...: p = sum(1 for x in xs if x.tobytes() in y_bytes)
927 ms ± 3.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [43]: %timeit intersection_count(xs, y)
7.55 ms ± 71.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
As noted earlier conversion to array could be the bottleneck. So, let's time that part too -
In [44]: %timeit np.asarray(xs)
3.41 ms ± 10.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
So, the array-conversion part is around 45%
of total runtime and that's a substantial one. Hence, the suggestion to work with 2D array as opposed to list of 1D arrays becomes crucial at this point. The upside is array data gives us vectorized capability and hence improved performance overall. Just to emphasis on the 2D array availibility, here are the speedups with and without -
In [45]: 927/7.55
Out[45]: 122.78145695364239
In [46]: 927/(7.55-3.41)
Out[46]: 223.91304347826087