1

I have a data stream that produces numeric values in random time intervals. Now I continuously need to get the maximum value the stream produced over a certain time interval, e.g. 100 milliseconds.

My naive approach would be to have a deque of pairs and a variable for the max. if an incoming value is greater than the max, I d clear the deque, otherwise I loop through the deque and if now - ts is greater than my lookback interval I would ignore it otherwise check if it is greater than the values before (unless it is the first value). if so, I save that iterator. after the loop, I delete the deque up to (excluding my max iterator) and set the new max.

I am just wondering if there is a smarter more elegant way to do this by using a different container. Ideally I would stick to some container from the c++ standard library.

EDIT: someone suggested a priority queue (answer got deleted). In this case, would I create a heap of pairs and tell the heap to sort by value ( or if that is not possible, create a struct with fields time stamp and value and add a > operator). then every time I get the max, I check if it is expired and if so, pop it and take the new max... is that a better approach than my initial one?

EDIT: values are not unique

chrise
  • 4,039
  • 3
  • 39
  • 74
  • Are the time intervals predefined or arbitrary? For example, can we assume that the intervals will always end in 00-99, or would you want to be able to ask for 013-112 sometimes and 027-126 at other times? – Brian Rodriguez Apr 04 '17 at 02:26
  • the lookback intervall is fixed, so will always be a constant like 100ms. the arrival of new values is random – chrise Apr 04 '17 at 02:45
  • What if 100 drifts out of the interval, but a near miss of 99 is still within the last 100ms? – user4581301 Apr 04 '17 at 03:23
  • 1
    I am not sure I understand your question. if I had a max value of 100 and that datapoint is more than 100ms in the past, then I want to drop it. If my next highest value is 99 and it is newer than 100ms, then I want to return 99 as my max – chrise Apr 04 '17 at 03:37

3 Answers3

3

==new update==

Use a (max) heap of std::pairs of value and timestamp. In c++11 the heap I'm discussing is a std::priority_queue. if you make the value type (and not the timestamp type) the first element of your pair (i.e.

std::pair<val_t,time_t> 

and not

std::pair<time_t,val_t>

), then you don't even need a custom comparator because std::pair comparison will give you the behavior you want by default (prioritizes comparing pair.first and only looks at pair.second when equal, uses std::less by default -> gives you max heap of pairs w.r.t. first value type).

Insert/push all the new values into the (max) heap. The largest value will always be on top. when polling, check the top sample's timestamp (sample.second) against now() minus the recency window age. if its too old, throw it out. certain value sequences can cause the heap to hide stale values underneath the max. when the heap exceeds a certain size threshold, empty it into a new one whilst filtering out the expired values. this should happen very infrequently proportional to the arrival rate of new samples as related to recency window size.

thanks to @chrise for suggesting an amendment to a good idea that I prematurely threw out

==previous update==

My original response answered only part of the question. I suggested using a max_heap (std::priority_queue, uses std::less by default -> max heap) to maintain the max value. this does not account for the recency maintenance. you have two separate concerns, searching for max and conditional membership. changing the rules for membership (the expiry criterion) on an existing heap will invalidate the heap and give you runtime errors that are hard to root cause.

instead you could use a list of pairs (or deque, probably better but I did examples with std::list) along with remove_if and a predicate that keeps track of the max value. this can be done two ways, by using lambdas, or by creating a functor class. using lambdas looks like this:

using sample = std::pair<unsigned,double>;
sample a{ 1,12.2 };
sample b{ 2,11.778 };
sample c{ 3,9.2 };
sample d{ 4,-2.6 };
sample e{ 5,10.1 };

std::list<sample> samples{ d,c,b,a };
cout << "list is: " << samples << endl << endl;
double maxval = -std::numeric_limits<double>::infinity();
unsigned cutoff = now() - timerange;
samples.remove_if([&maxval,cutoff](sample s) 
{
    //if older than cutoff, remove
    if (s.first < cutoff) return true;
    //otherwise, keep and update max
    maxval = std::max(maxval, s.second);
    return false;
});
cout << "max is: " << maxval << ", list is: " << samples << endl << endl;

see http://ideone.com/O6UJPW for full example.

and using a functor class looks like this:

using sample = std::pair<unsigned,double>;
sample a{ 1,12.2 };
sample b{ 2,11.778 };
sample c{ 3,9.2 };
sample d{ 4,-2.6 };
sample e{ 5,10.1 };

std::list<sample> samples{ d,c,b,a };
cout << "original list is: " << samples << endl;

double maxval(-std::numeric_limits<double>::infinity());
//eliminate samples older than 2
using pred = PredRetainMax<sample>;
samples.remove_if(pred(now() - timerange, maxval));
cout << "max is: " << maxval << ", list is: " << samples << endl << endl;

where predicate looks like this

template<class T>
struct PredRetainMax;

template<class Time, class Val>
struct PredRetainMax<std::pair<Time, Val>>
{
    PredRetainMax(Time cutoff, Val& m) :_cutoff(cutoff), _max(m) {}
    bool operator()(const std::pair<Time, Val>& s)
    {
        //if older than cutoff, remove
        if (s.first < _cutoff) return true;
        //otherwise, keep and update max
        _max = std::max(_max, s.second);
        return false;
    }
    Val get() { return _max; }
private:
    Time _cutoff;
    Val& _max;
};

see http://ideone.com/qs153j for full example

the functor is instantiated with a reference to the external maxval because the stl takes the remove_if predicate as a copy, so this is kind of a hack to keep track of the maxval.

==original response below==

Use a heap. In c++11 it is called a std::priority_queue. Insert/push all the new values into the (max) heap. The largest value will always be on top.

See http://en.cppreference.com/w/cpp/container/priority_queue For some helpful usage examples

chrisg
  • 200
  • 8
  • You can also keep track of a running median using two heaps, with one a max heap and the other a min heap. Keep the two the same size by popping excess values into the other heap. – chrisg Apr 04 '17 at 02:25
  • But how to remove items that are older than the time interval of interest (eg. 100ms)? – Michael Burr Apr 04 '17 at 02:25
  • @MichaelBurr See: http://stackoverflow.com/questions/11590624/how-to-make-stls-priority-queue-fixed-size – Brian Rodriguez Apr 04 '17 at 02:29
  • Can the same functionality be achieved by using a vector, and calling make_heap and pop_heap. Will there be any performance difference? – APan Apr 04 '17 at 02:45
  • 1
    So, I would have to check if the top element of the heap has expired, if so, pop it, get the next element, ... so, I would need to create a heap of structures with fields value and ts, and tell the heap to sort by value? – chrise Apr 04 '17 at 02:55
  • @chrise The priority queue, by definition, has either largest/smallest number in the root. When you deq the root, the next largest/smallest becomes the root, and so on. – Everyone Apr 04 '17 at 03:04
  • I think @chrise's suggestion of checking if the max is within the timeframe works will work nicely. If the top of the heap isn't still in the time window, discard it and check the next. I think that the only additional management that might be necessary is if the number of elements in heap grows too large (by whatever criteria might define too large). In that case a 'garbage collect' might need to occur where all (or maybe just some) of the elements are examined, discarding any that are too old or have a newer element that has a larger value. Nice and simple, unless I'm forgetting something. – Michael Burr Apr 04 '17 at 04:52
  • updated response with discussion on problems of using a heap of smart objects or heap with a smart comparator: changing the rules for membership (the expiry criterion) on an existing heap will invalidate the heap and give you runtime errors that are hard to root cause. or you could use a boost Fibonacci heap. but that's where I say good luck and are you sure you want to do that? – chrisg Apr 04 '17 at 06:31
  • 1
    I really like @chrise idea of just using the heap (after all) and throwing out expired values as you get them. popping is constant time so this should be ok if too much time hasn't passed. the downside is that the heap could grow with stale values that accumulate if you get a continuous stream of oscillating values with a growing upper bound. the fresh large values "hide" the old values in the heap so the never get cleaned out. the heap could grow unbounded this way. – chrisg Apr 04 '17 at 06:45
  • You have multiple suggestions here, which makes it difficult to comment on each one individually. You should split them into separate answers. – Stuart Berg Apr 04 '17 at 17:25
  • yes, stackoverflow keeps insisting I edit this answer instead of adding a new one. I am realizing I should've separated these answers – chrisg Apr 04 '17 at 18:21
3

If the data is small enough that it will easily fit in your CPU cache (say, 1 million float values), then we're all over-thinking this.

Just store a std::deque< std::pair<float, timestamp> >.

  • When a new value comes in, use push_back().
  • When you need to query the max element, call pop_front() until all expired values have been purged. Then just call std::max_element() on the whole deque.

If there are no cache misses, it will have the same (or better) performance as more elaborate solutions involving priority_queue and multiset, etc.

Stuart Berg
  • 17,026
  • 12
  • 67
  • 99
0

Just keep two copies of the data:

  • std::deque< std::pair<timestamp, double> >
  • std::multiset<double>

When a new value arrives:

  1. Add it to both containers

  2. Purge out-of-date values, by examining the front of the deque.

    a. If the first value of the deque is too old (expired), remove it from both containers.

    b. Repeat until the first value in the deque is not an expired value.

  3. The max value is at the end (a.k.a. rbegin()) of the multiset.

Performance notes:

Multiset insertions and deletions are O(log(N)) and rbegin() is O(1). If performance is really a concern and you don't mind using boost, consider using boost::container::flat_multiset, which is a drop-in replacement for std::multiset, but much faster (unless your data is huge).

Stuart Berg
  • 17,026
  • 12
  • 67
  • 99