0

I am writing an algorithm that takes in messages and then determines whether they compliant with the established messaging rate.

For example, no more than 5 messages are to be sent in ANY 50 second window. Therefore, this window MUST be a rolling window.

I have implemented this token bucket algorithm from this post. However, I can't get it working consistently. It passes some test cases but not others, which makes me think there is a logic issue hidden somewhere in here.

Here is what I have so far:

public class Messaging {
//times in millis
double time_before;
double time_now;
double now;
double time_passed;

double allowance;

//constants
static double per = 50000; // 50 seconds
static double rate = 5; //5 messages

public Messaging(){
    time_before = System.currentTimeMillis();
    allowance = rate;
}

public void onEvent(){
    time_now = System.currentTimeMillis();
    time_passed = time_now - time_before;
    time_before = time_now;

    allowance += time_passed * (rate / per);

    if (allowance > rate){
        allowance = rate;
        System.out.println("Reset Allowance");
    }

    if (allowance < 1.0){
        System.out.println("Discard");
    }else{
        System.out.println("Forward message");
        allowance -= 1.0;
    }


}

This doesn't work though!

public static void main(String[] args) {
    Messaging orders = new Messaging();
    for (int i = 0; i < 10; i++) {
        orders.onEvent();
        try {
            Thread.sleep(5000);
        } catch (Exception ex) {

        }        
    }
}

Running the code above gives this:

Forward message. Time: 1469830426910
Forward message. Time: 1469830431912
Forward message. Time: 1469830436913
Forward message. Time: 1469830441920
Forward message. Time: 1469830446929
Forward message. Time: 1469830451937
Forward message. Time: 1469830456939
Forward message. Time: 1469830461952
Forward message. Time: 1469830466962
Discard. Time: 1469830471970
Total time passed: 50067

Why is only the last message being discarded? Shouldn't allowance be decremented enough that it fails automatically after the 5th message?

I would like help with this particular implementation please. The actual implementation will be in a proprietary language that doesn't have queues, etc.

Community
  • 1
  • 1
cheenbabes
  • 382
  • 1
  • 8
  • 17
  • We cannot tell anything as your output doesn't have timestamps. There's no way for us to tell what's going on. One suggestion: decouple the filtering code from the timestamping so you can feed it a repeatable set of test data regardless of debugging. That will let you step through the code and figure this out. – Jim Garrison Jul 29 '16 at 22:08
  • @JimGarrison I added time stamp outputs. I am testing different variations so just using Thread.sleep right now to simulate messages coming in every x seconds. Stepping through the code I can tell the line `allowance += time_passed * (rate / per);` is what's creating the issue but since I've seen this same line on numerous implementation I'm wondering if I'm specifically doing something wrong in mine or if this algorithm doesn't work for a sliding window. – cheenbabes Jul 29 '16 at 22:16
  • 1
    That's not quite what he means - I'm currently writing an answer which I hope will explain it. – Andrew Williamson Jul 29 '16 at 22:19

2 Answers2

0

For a rate-limited sliding window, you need to queue up each message, along with it's timestamp. That way, when the queue is full, you just discard any new messages. When the messages at the end of the queue have been in there for more than the allotted time, they leave the queue, and you have room for more new messages.

class MessageBuffer {
    class Message {
        double timestamp;
        String value; // Can be any type you need it to be

        public Message(double timestamp, String value) {
            this.timestamp = timestamp;
            this.value = value;
        }
    }

    static final double WINDOW_SIZE = 5;
    static final double TIME_LIMIT = 50000;

    Queue<Message> messages = new ArrayDeque<>(WINDOW_SIZE);

    public void onEvent(String message) {
        double now = System.currentTimeMillis();

        // If the queue has messages in them that are no longer in the sliding window,
        // remove them from the queue
        while (messages.size() > 0
            && messages.peek().timestamp + TIME_LIMIT > now)
            messages.remove();

        // If there is room in the queue, process this message, otherwise discard it
        if (messages.size() < WINDOW_SIZE) {
            System.out.println("Forward message: " + message);
            messages.add(new Message(now, message));
        } else {
            System.out.println("Discard message: " + message);
        }
    }
}

Without this timestamp information, you can't tell when a message leaves the sliding window, so you can't know whether your window is full or not. FYI, that example you linked to is an approximation, and can actually limit you to less than 5 messages in 50 seconds.

Two minor nit-picks:

  • In an object-oriented world, classes are things, not actions. In most cases, your class name should be a noun that describes what it is (MessageBuffer), not a verb that describes what it does (Messaging).
  • The variable now should not be a member variable - when you first assign it, it does represent the current time, but once the method has finished, and another method is called, the value is stale - it doesn't actually represent the current time.
Andrew Williamson
  • 8,299
  • 3
  • 34
  • 62
  • This makes sense. Is there any way to do this without using a Queue object? For example if you only had access to a HashMap -esque data structure? I'm asking because the final implementation will not be in Java, but in a proprietary language that doesn't have a Queue object -- or really an array object for that matter (so I can't really build my own). The main thing it has is multi-indexed array lookups -- basically HashMaps that can have multiple keys. – cheenbabes Jul 29 '16 at 23:00
  • While it is odd that the language doesn't have an array type, you _can_ treat a hashmap as an array - just use the numeric index as the key. Then you could implement a queue on top of that. That gives you an option, albeit with a lot more work involved than the approximation method. I will see if I can get the method you originally use working instead. – Andrew Williamson Jul 30 '16 at 04:15
0

The algorithm you're using is an approximation - it will give you the rate you're looking for, on average. In the original post, the writer claims:

'allowance' grows at speed 5/8 units per seconds at most, i.e. at most five units per eight seconds. Every message that is forwarded deducts one unit, so you can't send more than five messages per every eight seconds.

This is not quite true - if allowance starts off at it's maximum value, say 5, it grows at 5/8 units per second, and then for every message sent, 1 is deducted. So allowance is shrinking at a rate of 3/8 units per second, and starting from 5, we can send around 10 messages before the throttling happens.

If you have a period where your messages are not coming in as fast as your throttle rate, then it will build up the allowance. Then, when the messages pick up pace, you have a brief period where you may end up processing 2 * rate messages before the throttling kicks in. If you change your loop to do 20 iterations, instead of 10, you'll see that eventually the throttling does behave as you'd expect it to. Alternatively, if you set the allowance to 0 to begin with, instead of rate, it will throttle straight away.

Community
  • 1
  • 1
Andrew Williamson
  • 8,299
  • 3
  • 34
  • 62
  • So in other words, there isn't a way to do a strict rate limit with a rolling time window without a queue? – cheenbabes Aug 01 '16 at 15:36
  • You _can_ make it a strict upper limit - instead of capping `allowance` at `rate`, cap it to 1 instead. This will also bring the _average_ rate down much lower than your desired rate though. Best case scenario, you have 5 messages that come in, all 10 seconds apart. They all get through. Worst case scenario, you have a break of 49 seconds, then get 5 messages in 1 second, 4 of them will be dropped. Your question is a good demonstration of the downsides to this approximation. – Andrew Williamson Aug 01 '16 at 21:28