2

Let A be a numpy 1D array of size 5 to 20 millions.

I'd like to determine, for each i, how many items among A[i-1000000], A[i-999999], ..., A[i-2], A[i-1] are smaller than A[i].

Said in another way: I'm looking for the proportion of items smaller than A[i] in a 1-million-item window preceding A[i].

I've tested various approaches and a few answers were given in Rolling comparison between a value and a past window, with percentile/quantile:

import numpy as np
A = np.random.random(5*1000*1000)
n = 1000*1000
B = (np.lib.stride_tricks.as_strided(A, shape=(n,A.size-n), strides=(A.itemsize,A.itemsize)) <= A[n:]).sum(0) 
#or similar version with "view_as_windows(A, n)"

Finally the fastest solution was some naive code + numba:

from numba import jit, prange
@jit(parallel=True)
    def doit(A, n):
        Q = np.zeros(len(A))
        for i in prange(n, len(Q)):
            Q[i] = np.sum(A[i-n:i] <= A[i])
        return(Q)
C = doit(A, n)

But even with this code, it's too slow for me with A of length 5 millions, and n=1 million: about 30 minutes to do this computation!

Is there a more clever idea to use, that avoids to re-compare 1 million items for each element of the output?

Note: having an approximative proportion with a 10^(-3) precision, like "~34.3% of the 1-million-previous-items are smaller than A[i]" would be enough.

Basj
  • 41,386
  • 99
  • 383
  • 673
  • Are the numbers in `A` integers or reals? Is there a limited range (or expected range) of values they can take? – andersource Dec 07 '18 at 17:07
  • @andersource they are floats in [-1..1) but we could as well use 16-bits integers by pre-multiplying A by 32768 and rounding. The precision would be enough with int16 – Basj Dec 07 '18 at 17:34
  • @andersource Do you think there's a better solution if they are int16? – Basj Dec 07 '18 at 20:20
  • Not sure, wanted to know the range we're dealing with to see if sacrificing space for efficiency makes any sense at all. Still thinking about this :) – andersource Dec 07 '18 at 20:22
  • Thanks @andersource :) Maybe the biggest problem is that I currently don't use the fact that the distribution of numbers (histogram) in A[327:1000327] and A[328:1000328] should not be too far from each other. Especially if we only care about 10^(-3) precision of the "proportion" of numbers smaller than A[i]. I don't know how to use this information, that the distribution of A[i:1000000+i] varies slowly when i changes a little. – Basj Dec 07 '18 at 20:26
  • Yeah, that's the idea I'm aiming for, though I also don't exactly know how to use that effectively – andersource Dec 07 '18 at 20:28
  • 1
    @Basj @andersource maybe this helps? suppose for a window size `k`, you use the window `A[i-k:i]` not for the element `A[i]`, but one of its neighbors `A[i+1]` (or `A[i-1]`). At most, the lessers count will deviate from "the true count for `A[i+1]`" by 1. Doing the same for `A[i+2]` would give you a max deviation of 2, and so on. So if your target precision is 1e-3, meaning that your acceptable error is half of that, 5e-4, then you could instead approximate results for the whole set of values `A[i+j] for j in range(-k * 5e-4, k * 5e-4)`, by simply reusing the same window `A[i-k: i]`. – CrepeGoat Dec 09 '18 at 22:32
  • @CrepeGoat Thank you for the answer you posted, and for this comment (which seems another idea, which would be even easier for me to implement). Could you post your last comment as a 2nd answer (for future reference / completeness)? – Basj Dec 10 '18 at 10:05

6 Answers6

3

Here is an "exact" approach. It solves the 5,000,000 / 1,000,000 sized problem (with floats) in under 20 seconds on rather pedestrian hardware.

I apologize for the rather technical code. I'm not sure it can be made much more readable.

The basic idea is to partition the array into a binary-ish tree-like thing (sorry, no formal scicomp training).

For example, if we have a chunks of size half a million then we can sort each of those at linlog cost and afterwards find the contribution of any block to each element of the next block at amortized constant cost.

The tricky bit is how to piece chunks of different sizes together in such a way that in the end we've counted everything and exactly once.

My approach is to start with small blocks and then keep fusing pairs of those. In principle that should keep the cost of sorting linear at each iteration because in theory (but not in numpy) we could fully exploit the sortedness of the smaller chunks.

As mentioned above the code is tricky mostly because we need to compare the right elements to any given block. It basically comes down to two rules: 1) The block must be fully contained in the element's lookback. 2) the block must not be contained in a larger block that is fully contained in the element's lookback.

Anyway, here is a sample run

size 5_000_000, lookback 1_000_000 -- took 14.593 seconds
seems correct -- 10_000 samples checked

and the code:

UPDATE: simplified the code a bit, also runs faster

UPDATE 2: added a version that does "<=" instead of "<"

"<":

import numpy as np
from numpy.lib.stride_tricks import as_strided

def add_along_axis(a, indices, values, axis):
    if axis<0:
        axis += a.ndim
    I = np.ogrid[(*map(slice, a.shape),)]
    I = *I[:axis], indices, *I[axis+1:]
    a[I] += values

aaa, taa, paa = add_along_axis, np.take_along_axis, np.put_along_axis
m2f, f2m = np.ravel_multi_index, np.unravel_index

def inv_perm(p):
    i = np.empty_like(p)
    paa(i, p, np.arange(p.shape[-1]), -1)
    return i

def rolling_count_smaller(data, n):
    N = len(data)
    b = n.bit_length()
    NN = (((N-1)>>b)+2)<<b
    d0 = np.empty(NN, data.dtype)
    d0[NN-N:] = data[::-1]
    d0[:NN-N] = data.max() + 1
    dt, it, r0 = d0.copy(), np.zeros(NN, int), np.zeros(NN, int)
    ch, ch2 = 1, 2
    for i in range(b-1):
        d0.shape = dt.shape = it.shape = r0.shape = -1, 2, ch
        sh = dt.shape
        (il, ir), (jl, jr), (k, _) = f2m(m2f(np.add(sh, (-1, -2, -1)), sh) - (n, n-ch), sh)
        I = min(il, ir) + 1
        bab = np.empty((I, ch2), dt.dtype)
        bab[:, ch:] = dt[sh[0]-I:, 0]
        IL, IR = np.s_[il-I+1:il+1, ir-I+1:ir+1]
        bab[:, k:ch] = d0[IL, jl, k:]
        bab[:, :k] = d0[IR, jr, :k]
        o = bab.argsort(1, kind='stable')
        ns, io = (o>=ch).cumsum(1), inv_perm(o)
        r0[IL, jl, k:] += taa(ns, io[:, k:ch], 1)
        r0[IR, jr, :k] += taa(ns, io[:, :k], 1)

        it[:, 1, :] += ch
        dt.shape = it.shape = r0.shape = -1, ch2
        o = dt.argsort(1, kind='stable')
        ns, io = (o>=ch).cumsum(1), inv_perm(o)
        aaa(r0, it[:, :ch], taa(ns, io[:, :ch], 1), 1)
        dt, it = taa(dt, o, 1), taa(it, o, 1)
        ch, ch2 = ch2, ch2<<1
    si, sj = dt.shape
    o = as_strided(dt, (si-1, sj<<1), dt.strides).argsort(1, kind='stable')
    ns, io = (o>=ch).cumsum(1), inv_perm(o)
    r0[:-1, ch2-n-1:] += taa(ns, taa(io, inv_perm(it)[:-1, ch2-n-1:], 1), 1)
    return r0.ravel()[:NN-N-1:-1]

l = 1000
data = np.random.randint(-99, 100, (5*l,))
from time import perf_counter as pc
t = pc()
x = rolling_count_smaller(data, l)
t = pc() - t
print(f'size {data.size:_d}, lookback {l:_d} -- took {t:.3f} seconds')
check = 1000
sample = np.random.randint(0, len(x), check)
y = np.array([np.count_nonzero(data[max(0, i-l):i]<data[i]) for i in sample])
assert np.all(y==x[sample])
print(f'seems correct -- {check:_d} samples checked')    

"<=":

import numpy as np
from numpy.lib.stride_tricks import as_strided

def add_along_axis(a, indices, values, axis):
    if axis<0:
        axis += a.ndim
    I = np.ogrid[(*map(slice, a.shape),)]
    I = *I[:axis], indices, *I[axis+1:]
    a[I] += values

aaa, taa, paa = add_along_axis, np.take_along_axis, np.put_along_axis
m2f, f2m = np.ravel_multi_index, np.unravel_index

def inv_perm(p):
    i = np.empty_like(p)
    paa(i, p, np.arange(p.shape[-1]), -1)
    return i

def rolling_count_smaller(data, n):
    N = len(data)
    b = n.bit_length()
    NN = (((N-1)>>b)+2)<<b
    d0 = np.empty(NN, data.dtype)
    d0[:N] = data
    d0[N:] = data.max() + 1
    dt, it, r0 = d0.copy(), np.zeros(NN, int), np.zeros(NN, int)
    ch, ch2 = 1, 2
    for i in range(b-1):
        d0.shape = dt.shape = it.shape = r0.shape = -1, 2, ch
        sh = dt.shape
        (il, ir), (jl, jr), (k, _) = f2m(m2f((0, 1, 0), sh) + (n-ch+1, n+1), sh)
        I = sh[0] - max(il, ir)
        bab = np.empty((I, ch2), dt.dtype)
        bab[:, :ch] = dt[:I, 1]
        IL, IR = np.s_[il:il+I, ir:ir+I]
        bab[:, ch+k:] = d0[IL, jl, k:]
        bab[:, ch:ch+k] = d0[IR, jr, :k]
        o = bab.argsort(1, kind='stable')
        ns, io = (o<ch).cumsum(1), inv_perm(o)
        r0[IL, jl, k:] += taa(ns, io[:, ch+k:], 1)
        r0[IR, jr, :k] += taa(ns, io[:, ch:ch+k], 1)
        it[:, 1, :] += ch
        dt.shape = it.shape = r0.shape = -1, ch2
        o = dt.argsort(1, kind='stable')
        ns, io = (o<ch).cumsum(1), inv_perm(o)
        aaa(r0, it[:, ch:], taa(ns, io[:, ch:], 1), 1)
        dt, it = taa(dt, o, 1), taa(it, o, 1)
        ch, ch2 = ch2, ch2<<1
    si, sj = dt.shape
    o = as_strided(dt, (si-1, sj<<1), dt.strides).argsort(1, kind='stable')
    ns, io = (o<ch).cumsum(1), inv_perm(o)
    r0[1:, :n+1-ch] += taa(ns, taa(io, ch+inv_perm(it)[1:, :n+1-ch], 1), 1)
    return r0.ravel()[:N]

l = 1000
data = np.random.randint(-99, 100, (5*l,))
from time import perf_counter as pc
t = pc()
x = rolling_count_smaller(data, l)
t = pc() - t
print(f'size {data.size:_d}, lookback {l:_d} -- took {t:.3f} seconds')
check = 1000
sample = np.random.randint(0, len(x), check)
y = np.array([np.count_nonzero(data[max(0, i-l):i]<=data[i]) for i in sample])
assert np.all(y==x[sample])
print(f'seems correct -- {check:_d} samples checked')    
Paul Panzer
  • 51,835
  • 3
  • 54
  • 99
  • As I also said to andersource: thank you very much @PaulPanzer for having spent so much time on this problem. I'm currently testing your answer (as well as the other answer)! I'll give feedback soon! – Basj Dec 10 '18 at 10:06
  • I'm trying to learn how it work, just a small question @PaulPanzer: which lines of code would you modify to count the number of A[i-k] ... that are less than *or equal* to A[i]? (i.e. my initial question with `<=` instead of `<`)? – Basj Dec 10 '18 at 12:49
  • @Basj I'm afraid it's not that simple. I've updated the post. Hope it helps. – Paul Panzer Dec 10 '18 at 15:35
  • 1
    @PaulPanzer I agree with your concern for readability; your code results sound great but I can't read it. My suggestions are: 1) use more verbose variable names (e.g., `b` is cryptic, but `log2_n` is descriptive and still pretty short. Same for `aaa, taa, paa`.); 2) break things into smaller steps (That can mean wrapping small chunks into functions. Or taking a statement made of several function calls, and spreading it over multiple lines, giving each intermediate result a descriptive variable name); 3) read some others' advice, e.g. [this](docs.python-guide.org/writing/style/) – CrepeGoat Dec 11 '18 at 21:50
  • @CrepeGoat I'm aware of most of the stuff you are quoting and I really don't think it is the issue here. Frankly, "longer variable names" while frequently recommended do not make things more readable. All they do is adding visual clutter that obscures essential structure. There is a reason mathematicians - who know a thing or two about approaching difficult problems - use mostly single letters or very short names. It's `tan`, not `ratio_between_sin_and_cos`. Don't get me wrong, I'm not denying my code is far from perfect as it stands. But passing it through autopep8 won't help a bit. – Paul Panzer Dec 11 '18 at 22:42
  • @PaulPanzer I get that, long names *can* be bad. But there's a happy medium, and I think compromising a little would do your code some good. – CrepeGoat Dec 11 '18 at 23:37
  • @PaulPanzer actually you know what, leave your code as-is. I'll play with it and if I can do better I'll let you know. Besides, yours runs magnitudes faster than anything I've written so far T-T – CrepeGoat Dec 11 '18 at 23:38
  • @CrepeGoat actually my gut feeling is that your splay tree approach is very similar to mine but lends itself to an easier to read/maintain/understand implementation. Complexity should be the same, assuming that merging two sorted arrays can be done in O(n). – Paul Panzer Dec 12 '18 at 00:12
  • the splay tree has low complexity, but in practice seems too slow even on pypy. I tried another method that seems similar to what you explained: pre-sort sections of the array, which you can combine to make a “compositely-sorted” window of arbitrary size/offset, then use successive binary searches on each part of the composite window to count the lessers for a given array value/window. I even used what I thought was some pretty slick numpy magic. And at most I get ~70-second run times for n=0.5m, k=0.1m. ...yours does sub-1-minute on my machine for the full problem :| I’m baffled – CrepeGoat Dec 13 '18 at 21:21
  • @CrepeGoat I've spent way too much time on this but in case you are interested I've posted in a separate answer a pythranized version that is roughly twice as fast. I think it is easier to understand and I've put quite a few comments. – Paul Panzer Dec 14 '18 at 21:17
  • Hah glad it's not just me XD – CrepeGoat Dec 14 '18 at 22:02
2

First attempt of an answer, based on the assumption (from the comments)

we could as well use 16-bits integers by pre-multiplying A by 32768 and rounding. The precision would be enough with int16

Assuming we're working with int16 numbers: I would try to maintain a relatively small array of size 2**16 counting how many times each number appeared in the last 1m window. Maintaining the array is O(1) as with each index increment you just reduce 1 count of the number the window just "left", and increment the "new" number. Then counting how many numbers in the window are smaller than the current number reduces to summing the array over all indices up to (excluding) the current number.

Assuming A[i] is in the range [-32768, 32768]:

B = np.zeros(2 * 32768 + 1)
Q = np.zeros(len(A))
n = 1000 * 1000

def adjust_index(i):
    return int(i) + 32768

for i in range(len(Q)):
    if i >= n + 1:
        B[adjust_index(A[i - n - 1])] -= 1
    if i > 0:
        B[adjust_index(A[i - 1])] += 1
    Q[i] = B[:adjust_index(A[i])].sum() / float(n)

This ran on my machine in about one minute.

You can trade-off space and some speed for accuracy by using a larger (or smaller) range of integers (e.g. multiplying by 2**17 instead of 2**16 to get more accurate at the cost of some speed; multiplying by 2**15 to get results faster but less accurately).

andersource
  • 799
  • 3
  • 9
  • Wondeful, thanks! Just being curious, 1 minute with numba, or without numba? (I couldn't get 1 minute without numba) – Basj Dec 07 '18 at 23:46
  • For me without numba, for A of size 5m – andersource Dec 08 '18 at 10:52
  • Thank you very much for having spent so much time on this problem. I'm currently testing your answer (as well as the other answer)! I'll give feedback soon! – Basj Dec 10 '18 at 10:06
2

Sorry in advance for not implementing my idea for you; I don’t quite have the time right now. But I hope it helps!

Notation

I'll use n as the array size, and k as the window size.

The Concept

For each element A[i], build a splay tree ordering all elements a in A[max(0, i-k): i+1], and then use the splay tree to count the number of elements a < A[i]. The advantage here is that the splay trees for adjacent elements A[i] & A[i+1] will differ only by one node insertion and (for i > k) one node removal, which reduces the time needed to build the splay trees.

The required operations have the following complexities:

  • for each i: O(n * ?)
    • adding A[i] as a node to the splay tree: amortized O(log k)
    • counting a < A[i]: since adding A[i] puts it in the root position, you need only check the left branch’s size counter -> O(1)
    • removing A[i-k-1] node: amortized O(log k)

Overall complexity: amortized O(n log(k))

CrepeGoat
  • 2,315
  • 20
  • 24
1

Reposting the contents of my comment at @Basj's request:

The Thought

Suppose for a window size k, you use the window A[i-k: i] not for the element A[i], but one of its neighbors A[i+1] (or A[i-1]).

The contents of this window A[i-k:i] are almost identical to that of the "true window for A[i+1]", A[i-k+1: i+1]; k-1 of their elements are the same, with only 1 (potentially) non-matching element. This would affect the lessers count for A[i+1] by at most 1; either the changed element is counted when the real one would not be, or vice-versa. Thus at the most, the lessers count for A[i+1] will deviate from "the true count for A[i+1]" by at most 1.

By the same logic, doing the same for A[i+2] (or A[i-2]) would give you a max deviation of 2, and more generally, doing the same for A[i+j] would give you a max deviation of abs(j).

So if your target precision is 1e-3, meaning that your acceptable error is half of that, 5e-4, then you could instead approximate results for the whole set of values A[i+j] for j in range(int(-k * 5e-4), int(k * 5e-4)), by simply reusing the same window A[i-k: i] for each A[i+j].

...Now what?

You can simply adjust your code to count the lessers in this adjusted window for each A[i+j], and increment i by k*1e-3 chunks.

...but this doesn't save you any time. You're still taking a chunk of k numbers, and counting the number of values less than some reference value a, and doing so for 5 million a's. That's exactly what you did before.

So the question is: how can you abuse the repetition to save time?

@Basj I'll leave the rest of this thought to you. It is Finals season, after all ;]

CrepeGoat
  • 2,315
  • 20
  • 24
1

Here is a pythranized version of my solution. It is roughly twice as fast and I think more readable even if is longer. Obvious downside is the added pythran dependency.

The main work horse is _mergsorted3 this scales well with increasing blocksize but is comparatively slow at small blocksize.

I've written one specialist for blocksize 1 to demonstrate how much more speed one could potentially gain.

import numpy as np
from _mergesorted2 import _mergesorted_1
from _mergesorted3 import _mergesorted3
from time import perf_counter as pc

USE_SPEC_1 = True
def rolling_count_smaller(D, n, countequal=True):
    N = len(D)
    B = n.bit_length() - 1
    # now: 2^(B+1) >= n > 2^B
    # result and sorter
    R, S = np.zeros(N, int), np.empty(N, int) if USE_SPEC_1 else np.arange(N)
    FL, FH, SL, SH = (np.zeros(3, dt) for dt in 'llll')
    T = pc()
    if USE_SPEC_1:
        _mergesorted_1(D, R, S, n, countequal)
    for b in range(USE_SPEC_1, B):
        print(b, pc()-T)
        T = pc()
        # for each odd block first treat the elements that are so far to its
        # right that they can see that block in full but not the block
        # containing it
        # most of the time (whenever 2^b does not divide n+1) these will  span
        # two blocks, hence fall into two ordered subgroups
        # thus do a threeway merge, but only a "dry run":
        # update the counts R but not the sorter S
        L, BB = n+1, ((n>>b)+1)<<b
        if L == BB:
            Kref = int(countequal)
            SL[1-countequal] = BB
            SH[1-countequal] = BB+(1<<b)
            FL[1-countequal] = BB
            FH[1-countequal] = n+1+(1<<b)
            SL[2] = SH[2] = FL[2] = FH[2] = 0
        else:
            Kref = countequal<<1
            SL[1-countequal:3-countequal] = BB-(1<<b), BB
            SH[1-countequal:3-countequal] = BB, BB+(1<<b)
            FL[1-countequal:3-countequal] = L, BB
            FH[1-countequal:3-countequal] = BB, n+1+(1<<b)
        SL[Kref] = FL[Kref] = 1<<b
        SH[Kref] = FH[Kref] = 1<<(b+1)
        _mergesorted3(D, R, S, SL, SH, FL, FH, N, 1<<(b+1), Kref, False, True)
        # merge pairs of adjacent blocks        
        SL[...] = 0
        SL[1-countequal] = 1<<b
        SH[2] = 0
        SH[:2] = SL[:2] + (1<<b)
        _mergesorted3(D, R, S, SL, SH, FL, FH, N, 1<<(b+1), int(countequal), True, False) 
    # in this last step even and odd blocks are treated the same because
    # neither can be contained in larger valid block
    SL[...] = 0
    SL[1-countequal] = 1<<B
    SH[2] = 0
    SH[int(countequal)] = 1<<B
    SH[1-countequal] = 1<<(B+1)
    FL[...] = 0
    FL[1-countequal] = 1<<B
    FH[2] = 0
    FH[int(countequal)] = 1<<B
    FH[1-countequal] = n+1
    _mergesorted3(D, R, S, SL, SH, FL, FH, N, 1<<B, int(countequal), False, True)
    return R

countequal=True
l = 1_000_000
np.random.seed(0)
data = np.random.randint(-99, 100, (5*l,))
from time import perf_counter as pc
t = pc()
x = rolling_count_smaller(data, l, countequal)
t = pc() - t
print(f'size {data.size:_d}, lookback {l:_d} -- took {t:.3f} seconds')
check = 10
sample = np.random.randint(0, len(x), check)

if countequal:
    y = np.array([np.count_nonzero(data[max(0, i-l):i]<=data[i]) for i in sample])
else:
    y = np.array([np.count_nonzero(data[max(0, i-l):i]<data[i]) for i in sample])
assert np.all(y==x[sample])
print(f'seems correct -- {check:_d} samples checked')    

The main worker _mergesorted3.py. Compile: pythran _mergesorted3.py

import numpy as np

#pythran export _mergesorted3(float[:], int[:], int[:], int[3], int[3], int[3], int[3], int, int, int, bool, bool)
#pythran export _mergesorted3(int[:], int[:], int[:], int[3], int[3], int[3], int[3], int, int, int, bool, bool)


# DB, RB, SB are the data, result and sorter arrays; here they are treated a
# bit like base pointers, hence the B in the names
# SL, SH are the low and high ends of the current rows of the three queues
# the next rows are assumed to be at offset N
# FL, FH are low and high ends of ranges in non sorted order used to filter
# each queue. they are ignored if 'filter' is False
# ST is the top index this can fall in the middle of a row which will then be
# processed partially
# Kref is the index of the referenve queue (the one whose elements are counted) 
def _mergesorted3(DB, RB, SB, SL, SH, FL, FH, ST, N, Kref, writeback, filter):
    if writeback: # set up row buffer for writing back of merged sort order
        SLbuf = min(SL[0], SL[1]) # low end of row
        SHbuf = max(SH[0], SH[1]) # high end of row
        Sbuf = np.empty(SHbuf-SLbuf, int) # buffer
        Ibuf = 0 # index
    D = np.empty(3, DB.dtype) # heads of the three queues. values
    S = np.empty(3, int) # heads the three queues. sorters
    while True: # loop over rows
        C = 0 # count of elements in the reference block seen so far
        I = SL.copy() # heads of the three queses. indices
        S[:2] = SB[I[:2]] # the inner loop expects the heads of the two non
                          # active (i.e. not incremented just now) queues
                          # to be in descending order
        if filter: # skip elements that are not within a contiguous range.
                   # this requires filtering because everything is referenced
                   # in sorted order. so we cannot directly select ranges in
                   # the original order
                   # it is the caller's responsibility that for all except
                   # possibly the last row the filtered queues are not empty
            for KK in range(2):
                while S[KK] < FL[KK] or S[KK] >= FH[KK]:
                    I[KK] += 1
                    S[KK] = SB[I[KK]]
        D[:2] = DB[S[:2]] # fetch the first two queue head values
        # and set the inter queue sorter accordingly
        K = np.array([1, 0, 2], int) if D[1] > D[0] else np.array([0, 1, 2], int)
        while I[K[2]] < SH[K[2]]: # loop to merge three rows
            # get a valid new elment from the active queue at sorter level
            S[K[2]] = SB[I[K[2]]]
            if filter and (S[K[2]] < FL[K[2]] or S[K[2]] >= FH[K[2]]):
                I[K[2]] += 1
                continue
            # fetch the corresponding value
            D[K[2]] = DB[S[K[2]]]
            # re-establish inter-queue sort order
            if D[K[2]] > D[K[1]] or (D[K[2]] == D[K[1]] and K[2] < K[1]):
                K[2], K[1] = K[1], K[2]
                if D[K[1]] > D[K[0]] or (D[K[1]] == D[K[0]] and K[1] < K[0]):
                    K[1], K[0] = K[0], K[1]
            # do the book keeping depending on which queue has become active
            if K[2] == Kref: # reference queue: adjust counter
                C += 1
            else: # other: add current ref element count to head of result queue
                RB[S[K[2]]] += C
            I[K[2]] += 1 # advance active queue
        # one queue has been exhausted, which one?
        if K[2] == Kref: # reference queue: no need to sort what's left just 
                         # add the current ref element count to all leftovers
                         # subject to filtering if applicable
            if filter:
                KK = SB[I[K[1]]:SH[K[1]]]
                RB[KK[(KK >= FL[K[1]]) & (KK < FH[K[1]])]] += C
                KK = SB[I[K[0]]:SH[K[0]]]
                RB[KK[(KK >= FL[K[0]]) & (KK < FH[K[0]])]] += C
            else:
                RB[SB[I[K[1]]:SH[K[1]]]] += C
                RB[SB[I[K[0]]:SH[K[0]]]] += C
        else: # one of the other queues: we are left with a two-way merge
              # this is in a separate loop because it also supports writing
              # back the new sort order which we do not need in the three way
              # situation
            while I[K[1]] < SH[K[1]]:
                S[K[1]] = SB[I[K[1]]]
                if filter and (S[K[1]] < FL[K[1]] or S[K[1]] >= FH[K[1]]):
                    I[K[1]] += 1
                    continue
                D[K[1]] = DB[S[K[1]]]
                if D[K[1]] > D[K[0]] or (D[K[1]] == D[K[0]] and K[1] < K[0]):
                    K[1], K[0] = K[0], K[1]
                if K[1] == Kref:
                    C += 1
                else:
                    RB[S[K[1]]] += C
                if writeback: # we cannot directly write back without messing
                              # things up. instead we buffer one row at a time
                    Sbuf[Ibuf] = S[K[1]]
                    Ibuf += 1
                I[K[1]] += 1
            # a second queue has been exhausted. which one?
            if K[1] == Kref: # the reference queue: must update results in
                             # the remainder of the other queue
                if filter:
                    KK = SB[I[K[0]]:SH[K[0]]]
                    RB[KK[(KK >= FL[K[0]]) & (KK < FH[K[0]])]] += C
                else:
                    RB[SB[I[K[0]]:SH[K[0]]]] += C
            if writeback: # write back updated order
                # the leftovers of the last remaining queue have not been
                # buffered but being contiguous can be written back directly
                # the way this is used by the main script actually gives a
                # fifty-fifty chance of copying something exactly onto itself
                SB[SLbuf+Ibuf:SHbuf] = SB[I[K[0]]:SH[K[0]]]
                # now copy the buffer
                SB[SLbuf:SLbuf+Ibuf] = Sbuf[:Ibuf]
                SLbuf += N; SHbuf += N
                Ibuf = 0
        SL += N; SH += N
        if filter:
            FL += N; FH += N
        # this is ugly:
        # going to the next row we must check whether one or more queues
        # have fully or partially hit the ceiling ST.
        # if two and fully we are done
        # if one fully we must alter the queue indices to make sure the
        # empty queue is at index 2, because of the requirement of having
        # at least one valid element in queues 0 and 1
        done = -1
        for II in range(3):
            if SH[II] == SL[II]:
                if done >= 0:
                    done = -2
                    break
                done = II
            elif SH[II] > ST:
                if SL[II] >= ST or (filter and FL[II] >= ST):
                    if done >= 0:
                        done = -2
                        break
                    done = II
                    if writeback:
                        SHbuf -= SH[II] - SL[II]
                    SH[II] = SL[II] = 0
                else:
                    if writeback:
                        SHbuf -= SH[II] - ST
                    SH[II] = ST
                if filter and FH[II] > ST:
                    FH[II] = ST
        if done == Kref or done == -2:
            break
        elif done == 0:
            SL[:2], SH[:2] = SL[1:], SH[1:]
            if filter:
                FL[:2], FH[:2] = FL[1:], FH[1:]
            SH[2] = SL[2]
            Kref -= 1
        elif done == 1:
            SL[1], SH[1] = SL[2], SH[2]
            if filter:
                FL[1], FH[1] = FL[2], FH[2]
            SH[2] = SL[2]
            Kref >>= 1

And the special case _mergesorted2.py - pythran _mergesorted2.py

import numpy as np

#pythran export _mergesorted_1(float[:], int[:], int[:], int, bool)
#pythran export _mergesorted_1(int[:], int[:], int[:], int, bool)

def _mergesorted_1(DB, RB, SB, n, countequal):
    N = len(DB)
    K = ((N-n-1)>>1)<<1
    for i in range(0, K, 2):
        if DB[i] < DB[i+1] or (countequal and DB[i] == DB[i+1]):
            SB[i] = i
            SB[i+1] = i+1
            RB[i+1] += 1
        else:
            SB[i] = i+1
            SB[i+1] = i
        if DB[i+1] < DB[i+1+n] or (countequal and DB[i+1] == DB[i+1+n]):
            RB[i+1+n] += 1
    for i in range(K, (N>>1)<<1, 2):
        if DB[i] < DB[i+1] or (countequal and DB[i] == DB[i+1]):
            SB[i] = i
            SB[i+1] = i+1
            RB[i+1] += 1
        else:
            SB[i] = i+1
            SB[i+1] = i
    if N & 1:
        SB[N-1] = N-1
Paul Panzer
  • 51,835
  • 3
  • 54
  • 99
0

Here is an approximate approach that is simple to implement and responds in O(n) time: (21 seconds for 5M values on my laptop). It should work well for data sets with values that vary by more than 1/1000th of the largest difference.

from collections import deque,Counter
def lessCount(A,window):
    precision = 1000 # 1/1000 th of value range
    result    = deque()
    counts    = [0]*(precision+1)
    minVal    = min(A)
    chunkSize = (max(A)-minVal)/precision
    keys      = deque()
    for i,a in enumerate(A):
        key = int((a-minVal)/chunkSize)
        keys.append(key)
        counts[key] += 1       
        lowerCount   = sum(counts[:key])
        result.append(lowerCount)
        if i < window: continue
        counts[keys.popleft()] -= 1
    return np.array(result)

It builds a rolling array of counts where the index is the relative position of the value divided in chunks. The chunk size is 1/1000th of the largest difference between values. For each element in A, there is only one addition and one subtraction to the array of counts. The number of values lower than the current one is the sum of counts up to the the position of that value in the counts array. You can increase the precision as you need but keep in mind that the time will be proportional to O(n)*precision

Alain T.
  • 40,517
  • 4
  • 31
  • 51