0

I am looking for an efficient method to split overlapping ranges, I have read many similar questions but none of them solves my problem.

The problem is simple, given a list of triplets, the first two elements of each triplet are integers and the first number is always less than or equal to the second, apply the following transformations such that for all pairings of the output, the start of the later triplet is always greater than the end of the other, and if the start of one triplet is equal to the end of the other plus one, they have different data:

If they have different data, subtract one from the other:

(0, 100, 'A'), (0, 20, 'B') -> (0, 20, 'B'), (21, 100, 'A')
(0, 100, 'A'), (20, 50, 'B') -> (0, 19, 'A'), (20, 50, 'B'), (51, 100, 'A')
(0, 100, 'A'), (50, 100, 'B') -> (0, 49, 'A'), (50, 100, 'B')
(0, 100, 'A'), (200, 300, 'B') -> (0, 100, 'A'), (200, 300, 'B')
(0, 100, 'A'), (50, 300, 'B') -> (0, 49, 'A'), (50, 300, 'B')

Else if they overlap, merge them:

(0, 100, 'A'), (0, 20, 'A') -> (0, 100, 'A')
(0, 100, 'A'), (20, 50, 'A') -> (0, 100, 'A')
(0, 100, 'A'), (50, 100, 'A') -> (0, 100, 'A')
(0, 100, 'A'), (200, 300, 'A') -> (0, 100, 'A'), (200, 300, 'A')
(0, 100, 'A'), (101, 300, 'A') -> (0, 300, 'A')
(0, 100, 'A'), (50, 300, 'A') -> (0, 300, 'A')

Here is a test case:

[(0, 16, 'red'), (0, 4, 'green'), (2, 9, 'blue'), (2, 7, 'cyan'), (4, 9, 'purple'), (6, 8, 'magenta'), (9, 14, 'yellow'), (11, 13, 'orange'), (18, 21, 'green'), (22, 25, 'green')]

Expected output:

[(0, 1, 'green'), (2, 3, 'cyan'), (4, 5, 'purple'), (6, 8, 'magenta'), (9, 10, 'yellow'), (11, 13, 'orange'), (14, 14, 'yellow'), (15, 16, 'red'), (18, 25, 'green')]

Graphic representation:

enter image description here

These functions encode the rules in Python:

def subtract(A, B):
    (As, Ae, _), (Bs, Be, Bd) = A, B
    if As > Be or Bs > Ae:
        return [[Bs, Be, Bd]]
    result = []
    if As > Bs:
        result.append([Bs, As - 1, Bd])
    if Ae < Be:
        result.append([Ae + 1, Be, Bd])
    return result


def join(A, B):
    (As, Ae, Ad), (Bs, Be, Bd) = A, B
    if Bs > As:
        As, Ae, Bs, Be = Bs, Be, As, Ae
    if Bs <= As and Ae <= Be:
        return [[Bs, Be, Bd]]
    return [[Bs, Ae, Bd]] if As <= Be + 1 else [[Bs, Be, Bd], [As, Ae, Ad]]

I have previously implemented a somewhat efficient function that splits the ranges when not (s1 < s2 < e1 < e2) always holds. But it isn't efficient enough and fails to give the correct output if the aforementioned constraint isn't enforced.

The accepted answer does improve performance and can handle cases where ranges intersect, but it isn't as efficient as my data demands. I have literally millions of items to process.

(make_generic_case can be found in the linked question above)

In [411]: sample = make_generic_case(4096, 65536, 16)

In [412]: %timeit solve(sample); sample.pop(-1)
2.14 ms ± 124 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [413]: len(solve(sample))
Out[413]: 1883

It takes about 2140/4096 = 0.5224609375 microseconds to process each item, 0.25 to 0.4 microseconds per item is acceptable.

The following function is the only function I wrote to date that gives the correct output when ranges intersect, but it isn't efficient at all:

def brute_force_discretize(ranges):
    numbers = {}
    ranges.sort(key=lambda x: (x[0], -x[1]))
    for start, end, data in ranges:
        numbers |= {n: data for n in range(start, end + 1)}
    numbers = list(numbers.items())
    l = len(numbers)
    i = 0
    output = []
    while i < l:
        di = 0
        curn, curv = numbers[i]
        while i != l and curn + di == numbers[i][0] and curv == numbers[i][1]:
            i += 1
            di += 1
        output.append((curn, numbers[i - 1][0], curv))
    return output

I have implemented several other algorithms found here and here, but all of them fail to pass make_generic_case(4096, 65536, 16) and comparing against brute_force_discretize test:

def discretize_gen(ranges):
    nodes = get_nodes(ranges)
    stack = []
    for (n1, e1, d1), (n2, e2, _) in zip(nodes, nodes[1:]):
        if e1:
            stack.remove(d1)
        else:
            stack.append(d1)
        start = n1 + e1
        end = n2 - (not e2)
        if start <= end and stack:
            yield start, end, stack[-1]

def merge(segments):
    start, end, data = next(segments)  # keep one segment buffered
    for start2, end2, data2 in segments:
        if end + 1 == start2 and data == data2:  # adjacent & same data
            end = end2  # merge
        else:
            yield start, end, data
            start, end, data = start2, end2, data2
    yield start, end, data  # flush the buffer

def subtractRanges(A, B):
    (As, Ae), (Bs, Be) = A, B
    if As > Be or Bs > Ae:
        return [[Bs, Be]]
    result = []
    if As > Bs:
        result.append([Bs, As - 1])
    if Ae < Be:
        result.append([Ae + 1, Be])
    return result


def merge_list(segments):
    start, end, data = segments.pop(0)
    for start2, end2, data2 in segments:
        if end + 1 == start2 and data == data2:
            end = end2
        else:
            yield start, end, data
            start, end, data = start2, end2, data2
    yield start, end, data


def discretize(ranges):
    i = 0
    ranges = [list(e) for e in ranges]
    while i < len(ranges):
        for superior in ranges[i + 1 :]:
            if result := subtractRanges(superior[:2], ranges[i][:2]):
                ranges[i][:2] = result[0]
                if len(result) > 1:
                    ranges[i + 1 : 0] = ([*result[1], ranges[i][2]],)
            else:
                ranges.pop(i)
                i -= 1
                break
        i += 1
    return list(merge_list(sorted(ranges)))

def flatten(ranges):
    stack = []
    result = []
    ranges = sorted(ranges, key=lambda x: (x[0], -x[1]))
    for ns, ne, nd in ranges:
        new = [ns, ne, nd]
        append = True
        for old in stack.copy():
            stack.remove(old)
            if nd != old[2]:
                for item in subtract(new, old):
                    if item[1] < ns:
                        result.append(item)
                    else:
                        stack.append(item)
            else:
                joined = join(new, old)
                if len(joined) == 1:
                    stack.extend(joined)
                    append = False
                else:
                    result.append(old)
        if append:
            stack.append(new)
    if stack:
        result.extend(stack)
    return sorted(result)

All of them fail the correctness check, and none of them is as efficient as my previous implementation to begin with.

What is an efficient method, that for any test case generated using make_generic_case(4096, 65536, 16), gives the same output as brute_force_discretize and takes about 1 millisecond to process such test case on average? (I have tested thoroughly that the function solve found in the accepted answer gives the correct output, so you can use that to verify the output).


I have to point out that I really have millions of triplets to process, no exaggeration. And the numbers are very large, they are under 2^32-1 (IPv4 address) and 2^128-1 (IPv6), so bruteforce methods are unacceptable. I in fact wrote one myself and I posted it above.

And I know how slow it is, it will take forever to process my data, or will burn my RAM.

This is the link to one of many datasets I have to process: https://git.ipfire.org/?p=location/location-database.git;a=tree, it is the database.txt file.

The file is very large (hundred mebibytes) and contains millions of entries, you have to scroll down (like really down) to see the IP ranges. I already wrote code that processes it, but it was very slow but much faster than the one posted in the existing answer below.

Ξένη Γήινος
  • 2,181
  • 1
  • 9
  • 35
  • So in short you are asking for code that gives the same solutions as [solve](https://stackoverflow.com/a/76696055/5459839), but faster? – trincot Jul 18 '23 at 15:03
  • 1
    @trincot That is correct, I want to cut the execution time to about 50%. – Ξένη Γήινος Jul 18 '23 at 15:13
  • 1
    Switching to another programming language would fulfill that requirement immediately. Does it have to be Python? – trincot Jul 18 '23 at 15:13
  • I currently only understand Python (and some JavaScript), unfortunately, but it doesn't have to be Python, it is just I write code in Python, I will write code that process the output further in Python, I think just anything that outputs Python objects is fine. I need to be able to feed the output to Python code. – Ξένη Γήινος Jul 18 '23 at 15:17
  • To repeat: options include PyPy, Cython and Numba. – greybeard Jul 19 '23 at 04:30

1 Answers1

0

If your ranges are integers and the values arent too large, you can use a "paint" technique where you override color positions in a list with the color specified by each range. Find the color breaks in the mask to identify the new range breaks and combine them using zip to reform the final list of merged/overlayed ranges

R = [(0, 16, 'red'), (0, 4, 'green'), (2, 9, 'blue'), (2, 7, 'cyan'), 
     (4, 9, 'purple'), (6, 8, 'magenta'), (9, 14, 'yellow'), 
     (11, 13, 'orange'), (18, 21, 'green'), (22, 25, 'green')]


M = [None]*(max(e for _,e,_ in R)+1) # paint mask (0 ... max end)
for s,e,c in R:
    M[s:e+1] = [c]*(e-s+1)           # paint range with color

pos = [i for i,(a,b) in enumerate(zip(M,M[1:]),1) if a!=b]
R2  = [(s,e-1,M[s]) for s,e in zip([0]+pos,pos+[len(M)]) if M[s]]

ouput:

print(R2)

[(0, 1, 'green'), (2, 3, 'cyan'), (4, 5, 'purple'), (6, 8, 'magenta'), 
 (9, 10, 'yellow'), (11, 13, 'orange'), (14, 14, 'yellow'), (15, 16, 'red'), 
 (17, 17, None), (18, 25, 'green')]

You could also use groupby from itertools to reform the groups (could be slightly faster):

from itertools import groupby
R2 = [(g[0],g[-1],c) 
      for c,(*g,) in groupby(range(len(M)),key=lambda i:M[i]) if c]

[EDIT]

For large value ranges, this simple "paint" logic isn't ideal.

Here is a different approach that creates a bit mask of range indexes for each starting and stopping positions. The ranges that stack on top of each other will add up as we go through position and subtract when reaching the position following their last. Since each range only starts once and ends once, instead of counting, we can use a bit mask and determine the layers at each position using an XOR bitwise operation on the masks (also works using sets but bit masks are faster):

from collections import defaultdict
def bitMasks(R):
    R = sorted(R,key=lambda t:(t[0],t[0]-t[1])) # see note
    posMask = defaultdict(int)
    for i,(s,e,_) in enumerate(R):
        posMask[s]   |= 1<<i
        posMask[e+1] |= 1<<i        
    result       = []
    start,color  = 0, None
    layers       = 0
    for pos in sorted(posMask):
        layers  ^= posMask[pos]
        newColor = R[layers.bit_length()-1][-1] if layers else None
        if color == newColor: continue
        if color is not None:
            result.append((start,pos-1,color))
        start,color = pos,newColor
    return result

Note: the ranges that start at the same position are sorted in decreasing size so the smaller overlays always have precedence over the larger ones. This allows use of the high bit position (i.e. last occurrence) to be used as the top layer index.

output:

bitMasks(R)

[(0, 1, 'green'), (2, 3, 'cyan'), (4, 5, 'purple'), (6, 8, 'magenta'), 
 (9, 10, 'yellow'), (11, 13, 'orange'), (14, 14, 'yellow'), 
 (15, 16, 'red'), (18, 25, 'green')]

I benchmarked it against the solve() solution and found that bitMasks is 75% slower so not as good but it may provide some inspiration.

Alain T.
  • 40,517
  • 4
  • 31
  • 51
  • I had the same idea, but trincot's `solve()` for `make_generic_case(4096, 65536, 16)` is much, much faster (0.075 vs 0.00098 s in my benchmark) – Andrej Kesely Jul 18 '23 at 19:57