I wrote a script to deal with overlapping intervals, I took several attempts at this before, I won't go into the details here, you can see this to get more context. This version is the smartest I came up with, it is the most efficient, and after rigorous testing I have determined that its output is completely correct.
The problem can be simplified as this, given a huge number of triples (a, b, c)
, in each triple a
and b
are integers and b >= a
, each triple represents a range range(a, b + 1)
, and c
is the data associated with the range. (a, b, c)
is equivalent to {range(a, b + 1): c}
.
There are a lot of overlaps in the ranges, and adjacent ranges that share data.
Suppose there are two such ranges, call them (s0, e0, d0)
and (s1, e1, d1)
respectively, one of the following 3 undesirable situations might happen:
s0 <= s1 <= e1 < e0
, the second range is completely contained within the first, andd0 == d1
the second range is a sub range of the first, like above, but
d0 != d1
e0 == s1 - 1 and d0 == d1
The following situations never happen:
e0 == s1
ands0 <= s1 and e0 <= e1
The ranges either are completely discrete, or one is a complete subset of the other, there are no partial intersections.
Now I want to process the ranges such that there are no overlaps and all adjacent ranges share different data, this requires the following operations for each of the above 3 situations:
ignore the subrange, do nothing, because they share the same data
split the parent range, delete the overlapping portion of the parent range, and overwrite the overlapping range with the data of the sub range.
ignore the second range, and set
e0
toe1
, so that the two ranges are combined.
The result should contain no overlapping ranges and no extra ranges, each range has only one datum and it is always inherited from the newer, smaller range.
The input is sorted in such a way that can be obtained in Python by using lambda x: (x[0], -x[1])
as key. And if the input is not sorted I will sort it in such a way before processing it.
This is a small example:
Data:
[(0, 10, 'A'), (0, 1, 'B'), (2, 5, 'C'), (3, 4, 'C'), (6, 7, 'C'), (8, 8, 'D')]
Meaning of the data:
[
{0: 'A', 1: 'A', 2: 'A', 3: 'A', 4: 'A', 5: 'A', 6: 'A', 7: 'A', 8: 'A', 9: 'A', 10: 'A'},
{0: 'B', 1: 'B'},
{2: 'C', 3: 'C', 4: 'C', 5: 'C'},
{3: 'C', 4: 'C'},
{6: 'C', 7: 'C'},
{8: 'D'}
]
My intended process is equivalent to simply processing them in order and update the dictionary according to the current item one by one.
Processed data:
{0: 'B', 1: 'B', 2: 'C', 3: 'C', 4: 'C', 5: 'C', 6: 'C', 7: 'C', 8: 'D', 9: 'A', 10: 'A'}
Output:
[(0, 1, 'B'), (2, 7, 'C'), (8, 8, 'D'), (9, 10, 'A')]
I need to do this as efficiently as possible, because I literally have millions of entries to process and that is not an exaggeration. I tried to use intervaltree
but it is way too inefficient for my purposes:
In [3]: %timeit merge_rows([[0, 10, 'A'], [0, 1, 'B'], [2, 5, 'C'], [3, 4, 'C'], [6, 7, 'C'],[8, 8, 'D']])
499 µs ± 14.5 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
The following is my code
from bisect import bisect_left
class Interval:
instances = []
def __init__(self, start, end, data):
self.start = start
self.end = end
self.data = data
self.ends = [end]
self.children = []
Interval.instances.append(self)
def variate(self, start, data):
return not self.start <= start <= self.end or data != self.data
def distinct(self, start, end, data):
if self.end == start - 1 and self.data == data:
self.ends[-1] = self.end = end
return False
return True
def fill(self, start):
if self.start < start:
before = start - 1
self.ends.insert(-1, before)
self.children.append(Interval(self.start, before, self.data))
def merge_worker(self, start, end, data):
self.fill(start)
add = self.start < start
if not add:
add = not self.children or self.children[-1].distinct(start, end, data)
if add:
self.children.append(Interval(start, end, data))
self.ends.insert(-1, end)
self.start = end + 1
def merge(self, start, end, data):
if not (self.variate(start, data) and self.distinct(start, end, data)):
return
if self.start <= start:
self.merge_worker(start, end, data)
else:
self.children[bisect_left(self.ends, start)].merge(start, end, data)
def discretize(seq):
Interval.instances.clear()
current = -1e309
for start, end, data in seq:
if start > current + 1:
top = Interval(start, end, data)
current = end
else:
top.merge(start, end, data)
return sorted(((i.start, i.end, i.data) for i in Interval.instances))
And it is much more efficient.
In [305]: %timeit discretize([[0, 10, 'A'], [0, 1, 'B'], [2, 5, 'C'], [3, 4, 'C'], [6, 7, 'C'],[8, 8, 'D']])
11 µs ± 85.4 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
I used the following code to test it:
English isn't my mother tongue and I am not really good at words so I won't describe my method, the code isn't hard to understand.
My question is, how can I dynamically calculate the insertion index of the so that I can insert the ranges in order directly, thus eliminating the need to sort the result? The result must be in order and sorting is really expensive.
I can attach an index to each instance when it is created but so far I only know how to use consecutive integers as index, just increment by one each time an instance is created, I don't know how to dynamically calculate the index required to make the instances keep sorted.
How can I calculate the index required? Or is there some better way?