167

I need a component/class that throttles execution of some method to maximum M calls in N seconds (or ms or nanos, does not matter).

In other words I need to make sure that my method is executed no more than M times in a sliding window of N seconds.

If you don't know existing class feel free to post your solutions/ideas how you would implement this.

vtrubnikov
  • 2,045
  • 3
  • 14
  • 10
  • 4
    There are some great answers to this problem at http://stackoverflow.com/questions/667508/whats-a-good-rate-limiting-algorithm – skaffman Sep 10 '09 at 19:29
  • http://stackoverflow.com/questions/6271409/limiting-upload-speed-on-java – kervin Aug 09 '15 at 16:21
  • > I need to make sure that my method is > executed no more than M times in a > sliding window of N seconds. I recently wrote a blog post about how to do this in .NET. You might be able to create something similar in Java. [Better Rate Limiting in .NET](http://www.jackleitch.net/2010/10/better-rate-limiting-with-dot-net/) – Jack Leitch Oct 19 '10 at 15:42
  • The original question sounds a lot like the problem solved in this blog post: [Java Multi-Channel Asynchronous Throttler](http://www.cordinc.com/blog/2010/04/java-multichannel-asynchronous.html ). For a rate of M calls in N seconds, the throttler discussed in this blog guarantees that *any* interval of length N on the timeline will not contain more than M calls. – Hbf May 12 '11 at 08:14
  • Additionally, the same person has improved his implementation to guarantee call order preservation: http://www.cordinc.com/blog/2010/06/ordered-java-multichannel-asyn.html – ikaerom Feb 17 '16 at 11:27
  • You can use redis for this when locking is needed in distributed system. Second algorithm in https://redis.io/commands/incr – Kanagavelu Sugumar Mar 27 '17 at 17:46
  • Check out the [TimerTask(https://docs.oracle.com/javase/7/docs/api/java/util/TimerTask.html) class. Or the [ScheduledExecutor](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html). – Gandalf Sep 10 '09 at 19:06
  • Scala players may consider [RateLimiter](https://failsafe.dev/rate-limiter) – Ricardo Apr 05 '23 at 00:19

15 Answers15

97

I'd use a ring buffer of timestamps with a fixed size of M. Each time the method is called, you check the oldest entry, and if it's less than N seconds in the past, you execute and add another entry, otherwise you sleep for the time difference.

Michael Borgwardt
  • 342,105
  • 78
  • 482
  • 720
  • 5
    Lovely. Just what I need. Quick attempts shows ~10 lines to implement this and minimal memory footprint. Just need to think about thread safety and queuing of incoming requests. – vtrubnikov Sep 10 '09 at 20:17
  • 7
    That's why you use the DelayQueue from java.util.concurrent. It prevents the problem of multiple threads acting on the same entry. – erickson Sep 10 '09 at 20:22
  • 6
    For a multi threaded case, the token bucket approach may be a better choice, I think. – Michael Borgwardt Sep 10 '09 at 20:44
  • 1
    Do you know how this algorithm is called if it's has any name at all? – Vlado Pandžić Mar 06 '18 at 16:34
  • 2
    @MichaelBorgwardt "and if it's less than N seconds in the past". Do you mean _more_? Don't you intend to call only if a given time has passed? – oligofren Dec 08 '21 at 10:15
95

What worked out of the box for me was Google Guava RateLimiter.

// Allow one request per second
private RateLimiter throttle = RateLimiter.create(1.0);

private void someMethod() {
    throttle.acquire();
    // Do something
}
gtonic
  • 2,295
  • 1
  • 24
  • 32
schnatterer
  • 7,525
  • 7
  • 61
  • 80
  • This is a great solution! The only issue is if you don't want to use Guava. – Joqus Sep 12 '14 at 17:36
  • 24
    I wouldn't recommend this solution as the Guava RateLimiter will block the thread and that will exhaust the thread pool easily. – kaviddiss Oct 22 '14 at 19:35
  • 24
    @kaviddiss if you don't want to block then use `tryAquire()` – slf Mar 10 '15 at 19:33
  • 9
    The issue with the currently implementation of RateLimiter (at least for me) is that it does not allow for time periods of greater than 1 second and therefore rates of for example 1 per minute. – John B May 21 '15 at 19:17
  • 5
    @John B As far as I understand, you can achieve 1 request per minute with RateLimiter by using RateLimiter.create(60.0)+rateLimiter.acquire(60) – divideByZero Sep 18 '15 at 12:41
  • 1
    @divideByZero - Interesting thought. More accurate would be `RateLimiter.create(1.0)` to get one permit per second. Then each acquire would use `acquire(60)` to only allow one per minute. – John B Sep 21 '15 at 12:26
  • 1
    Any one tried Ratelimiter.create(1.0/60) to achieve 1 per minute? – radiantRazor Feb 16 '16 at 09:35
  • 3
    @radiantRazor Ratelimiter.create(1.0/60) and acquire() achieves 1 call per minute. – bizentass Aug 17 '16 at 19:22
  • 2
    If you use this, be careful to consider how it behaves under past underutilization. You can have bursts that allow more than expected calls. – narduk Dec 09 '16 at 13:05
  • 2
    As an alternative you can use the TimedSemaphore from apache.commons : https://commons.apache.org/proper/commons-lang/javadocs/api-release/org/apache/commons/lang3/concurrent/TimedSemaphore.html – asmaier Jan 12 '18 at 13:52
  • @slf rateLimiter.tryAcquire() – BuffK Aug 04 '20 at 06:58
30

In concrete terms, you should be able to implement this with a DelayQueue. Initialize the queue with M Delayed instances with their delay initially set to zero. As requests to the method come in, take a token, which causes the method to block until the throttling requirement has been met. When a token has been taken, add a new token to the queue with a delay of N.

erickson
  • 265,237
  • 58
  • 395
  • 493
  • 1
    Yes, this would do the trick. But I don't particularly like DelayQueue because it is using (via PriortyQueue) a balanced binary hash (which means lots of comparisons on `offer` and possible array growth), and its all kinda heavy for me. I guess for others this might be perfectly okay. – vtrubnikov Sep 10 '09 at 20:25
  • 5
    Actually, in this application, since the new element added to the heap will almost always be the maximum element in the heap (i.e., have the longest delay), usually one comparison per add is required. Also, the array will never grow if the algorithm is implemented correctly, since one element is added only after taking one element. – erickson Sep 10 '09 at 20:59
  • 3
    I found this helpful also in cases where you dont want requests to happen in big bursts by keeping the size M and delay N relatively small in order of handful millis. eg. M = 5, N = 20ms would provide a through put of 250/sec kepping burst to happen in size of 5. – FUD Sep 22 '12 at 10:51
  • Will this scale for a million rpm and when concurrent requests are allowed? I would need to add a million delayedElements. Also corner cases will be high on latency - case where multiple threads are calling poll() and it would be locking each time. – Aditya Joshee May 02 '20 at 13:02
  • @AdityaJoshee I haven’t benchmarked it, but if I get some time I will try to get a sense of the overhead. One thing To note though is that you don’t need 1 million tokens that expire in 1 second. You could have 100 tokens that expire in 10 milliseconds, 10 tokens that expire in millisecond, etc. This actually forces the instantaneous rate to be closer to the average rate, smoothing spikes, which may cause backups at the client, but that’s a natural consequence of rate limiting. 1 million RPM hardly sounds like throttling though. If you can explain your use case I might have better ideas. – erickson May 02 '20 at 15:32
22

Read up on the Token bucket algorithm. Basically, you have a bucket with tokens in it. Every time you execute the method, you take a token. If there are no more tokens, you block until you get one. Meanwhile, there is some external actor that replenishes the tokens at a fixed interval.

I'm not aware of a library to do this (or anything similar). You could write this logic into your code or use AspectJ to add the behavior.

Kevin
  • 30,111
  • 9
  • 76
  • 83
  • 4
    Thanks for suggestion, interesting algo. But its not exactly what I need. For example, I need to limit execution to 5 call per second. If I use Token bucket and 10 requests come in at the same time, first 5 calls would take all available tokens and execute momentarily, while remaining 5 calls will be executed at fixed interval of 1/5 s. In such situation I need remaining 5 call to be executed in single burst only after 1 second passes. – vtrubnikov Sep 10 '09 at 20:37
  • 5
    What if you added 5 tokens to the bucket every second (or 5 - (5-remaining) instead of 1 every 1/5 second? – Kevin Sep 10 '09 at 20:43
  • @Kevin no this still would not give me 'sliding window' effect – vtrubnikov Sep 10 '09 at 20:53
  • 2
    @valery yes it would. (Remember to cap the tokens at M though) – nos Oct 09 '13 at 23:43
  • no need for an "external actor". Everything can be done single threaded if you keep metadata around about request times. – Marsellus Wallace Jun 02 '16 at 13:50
8

If you need a Java based sliding window rate limiter that will operate across a distributed system you might want to take a look at the https://github.com/mokies/ratelimitj project.

A Redis backed configuration, to limit requests by IP to 50 per minute would look like this:

import com.lambdaworks.redis.RedisClient;
import es.moki.ratelimitj.core.LimitRule;

RedisClient client = RedisClient.create("redis://localhost");
Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key
RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules);

boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2");

See https://github.com/mokies/ratelimitj/tree/master/ratelimitj-redis fore further details on Redis configuration.

user2326162
  • 81
  • 1
  • 1
5

This depends in the application.

Imagine the case in which multiple threads want a token to do some globally rate-limited action with no burst allowed (i.e. you want to limit 10 actions per 10 seconds but you don't want 10 actions to happen in the first second and then remain 9 seconds stopped).

The DelayedQueue has a disadvantage: the order at which threads request tokens might not be the order at which they get their request fulfilled. If multiple threads are blocked waiting for a token, it is not clear which one will take the next available token. You could even have threads waiting forever, in my point of view.

One solution is to have a minimum interval of time between two consecutive actions, and take actions in the same order as they were requested.

Here is an implementation:

public class LeakyBucket {
    protected float maxRate;
    protected long minTime;
    //holds time of last action (past or future!)
    protected long lastSchedAction = System.currentTimeMillis();

    public LeakyBucket(float maxRate) throws Exception {
        if(maxRate <= 0.0f) {
            throw new Exception("Invalid rate");
        }
        this.maxRate = maxRate;
        this.minTime = (long)(1000.0f / maxRate);
    }

    public void consume() throws InterruptedException {
        long curTime = System.currentTimeMillis();
        long timeLeft;

        //calculate when can we do the action
        synchronized(this) {
            timeLeft = lastSchedAction + minTime - curTime;
            if(timeLeft > 0) {
                lastSchedAction += minTime;
            }
            else {
                lastSchedAction = curTime;
            }
        }

        //If needed, wait for our time
        if(timeLeft <= 0) {
            return;
        }
        else {
            Thread.sleep(timeLeft);
        }
    }
}
Duarte Meneses
  • 2,868
  • 19
  • 22
5

My implementation below can handle arbitrary request time precision, it has O(1) time complexity for each request, does not require any additional buffer, e.g. O(1) space complexity, in addition it does not require background thread to release token, instead tokens are released according to time passed since last request.

class RateLimiter {
    int limit;
    double available;
    long interval;

    long lastTimeStamp;

    RateLimiter(int limit, long interval) {
        this.limit = limit;
        this.interval = interval;

        available = 0;
        lastTimeStamp = System.currentTimeMillis();
    }

    synchronized boolean canAdd() {
        long now = System.currentTimeMillis();
        // more token are released since last request
        available += (now-lastTimeStamp)*1.0/interval*limit; 
        if (available>limit)
            available = limit;

        lastTimeStamp = now;
        if (available<1)
            return false;
        else {
            available--;
            return true;
        }
    }
}
tonywl
  • 131
  • 2
  • 10
  • What is "limit"? – steph643 Jul 19 '21 at 15:02
  • @steph643, limit is the the value of rate limiting threshold, i.e. if you want to enforce rate limiting of 5 per second, limit=5, interval = 1s. – tonywl Jul 19 '21 at 23:35
  • @tonywl I think you have a bug - you aren't updating `lastTimeStamp` when no tokens are available, so until there's a full token, the more often you call - the faster you accumulate token fractions, regardless of how much time actually passed. for example: I init with `(2,1000)`, then wait 100ms and call 5 times quickly - the 5th call will be allowed even though only slightly more than 100ms have passed. – Guss Feb 07 '23 at 14:07
  • 1
    Future people - my above comment was from before the code was fixed. It is now fixed. – Guss Feb 13 '23 at 18:11
3

Although it's not what you asked, ThreadPoolExecutor, which is designed to cap to M simultaneous requests instead of M requests in N seconds, could also be useful.

Eugene Yokota
  • 94,654
  • 45
  • 215
  • 319
2

I have implemented a simple throttling algorithm.Try this link, http://krishnaprasadas.blogspot.in/2012/05/throttling-algorithm.html

A brief about the Algorithm,

This algorithm utilizes the capability of Java Delayed Queue. Create a delayed object with the expected delay (here 1000/M for millisecond TimeUnit). Put the same object into the delayed queue which will intern provides the moving window for us. Then before each method call take the object form the queue, take is a blocking call which will return only after the specified delay, and after the method call don't forget to put the object into the queue with updated time(here current milliseconds).

Here we can also have multiple delayed objects with different delay. This approach will also provide high throughput.

Devas
  • 1,544
  • 4
  • 23
  • 28
1

This is an update to the LeakyBucket code above. This works for a more that 1000 requests per sec.

import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;

class LeakyBucket {
  private long minTimeNano; // sec / billion
  private long sched = System.nanoTime();

  /**
   * Create a rate limiter using the leakybucket alg.
   * @param perSec the number of requests per second
   */
  public LeakyBucket(double perSec) {
    if (perSec <= 0.0) {
      throw new RuntimeException("Invalid rate " + perSec);
    }
    this.minTimeNano = (long) (1_000_000_000.0 / perSec);
  }

  @SneakyThrows public void consume() {
    long curr = System.nanoTime();
    long timeLeft;

    synchronized (this) {
      timeLeft = sched - curr + minTimeNano;
      sched += minTimeNano;
    }
    if (timeLeft <= minTimeNano) {
      return;
    }
    TimeUnit.NANOSECONDS.sleep(timeLeft);
  }
}

and the unittest for above:

import com.google.common.base.Stopwatch;
import org.junit.Ignore;
import org.junit.Test;

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class LeakyBucketTest {
  @Test @Ignore public void t() {
    double numberPerSec = 10000;
    LeakyBucket b = new LeakyBucket(numberPerSec);
    Stopwatch w = Stopwatch.createStarted();
    IntStream.range(0, (int) (numberPerSec * 5)).parallel().forEach(
        x -> b.consume());
    System.out.printf("%,d ms%n", w.elapsed(TimeUnit.MILLISECONDS));
  }
}
0

Try to use this simple approach:

public class SimpleThrottler {

private static final int T = 1; // min
private static final int N = 345;

private Lock lock = new ReentrantLock();
private Condition newFrame = lock.newCondition();
private volatile boolean currentFrame = true;

public SimpleThrottler() {
    handleForGate();
}

/**
 * Payload
 */
private void job() {
    try {
        Thread.sleep(Math.abs(ThreadLocalRandom.current().nextLong(12, 98)));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.err.print(" J. ");
}

public void doJob() throws InterruptedException {
    lock.lock();
    try {

        while (true) {

            int count = 0;

            while (count < N && currentFrame) {
                job();
                count++;
            }

            newFrame.await();
            currentFrame = true;
        }

    } finally {
        lock.unlock();
    }
}

public void handleForGate() {
    Thread handler = new Thread(() -> {
        while (true) {
            try {
                Thread.sleep(1 * 900);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                currentFrame = false;

                lock.lock();
                try {
                    newFrame.signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    });
    handler.start();
}

}

SergeZ
  • 298
  • 1
  • 5
  • 18
0

Apache Camel also supports comes with Throttler mechanism as follows:

from("seda:a").throttle(100).asyncDelayed().to("seda:b");
gtonic
  • 2,295
  • 1
  • 24
  • 32
0

Here is a little advanced version of simple rate limiter

/**
 * Simple request limiter based on Thread.sleep method.
 * Create limiter instance via {@link #create(float)} and call {@link #consume()} before making any request.
 * If the limit is exceeded cosume method locks and waits for current call rate to fall down below the limit
 */
public class RequestRateLimiter {

    private long minTime;

    private long lastSchedAction;
    private double avgSpent = 0;

    ArrayList<RatePeriod> periods;


    @AllArgsConstructor
    public static class RatePeriod{

        @Getter
        private LocalTime start;

        @Getter
        private LocalTime end;

        @Getter
        private float maxRate;
    }


    /**
     * Create request limiter with maxRate - maximum number of requests per second
     * @param maxRate - maximum number of requests per second
     * @return
     */
    public static RequestRateLimiter create(float maxRate){
        return new RequestRateLimiter(Arrays.asList( new RatePeriod(LocalTime.of(0,0,0),
                LocalTime.of(23,59,59), maxRate)));
    }

    /**
     * Create request limiter with ratePeriods calendar - maximum number of requests per second in every period
     * @param ratePeriods - rate calendar
     * @return
     */
    public static RequestRateLimiter create(List<RatePeriod> ratePeriods){
        return new RequestRateLimiter(ratePeriods);
    }

    private void checkArgs(List<RatePeriod> ratePeriods){

        for (RatePeriod rp: ratePeriods ){
            if ( null == rp || rp.maxRate <= 0.0f || null == rp.start || null == rp.end )
                throw new IllegalArgumentException("list contains null or rate is less then zero or period is zero length");
        }
    }

    private float getCurrentRate(){

        LocalTime now = LocalTime.now();

        for (RatePeriod rp: periods){
            if ( now.isAfter( rp.start ) && now.isBefore( rp.end ) )
                return rp.maxRate;
        }

        return Float.MAX_VALUE;
    }



    private RequestRateLimiter(List<RatePeriod> ratePeriods){

        checkArgs(ratePeriods);
        periods = new ArrayList<>(ratePeriods.size());
        periods.addAll(ratePeriods);

        this.minTime = (long)(1000.0f / getCurrentRate());
        this.lastSchedAction = System.currentTimeMillis() - minTime;
    }

    /**
     * Call this method before making actual request.
     * Method call locks until current rate falls down below the limit
     * @throws InterruptedException
     */
    public void consume() throws InterruptedException {

        long timeLeft;

        synchronized(this) {
            long curTime = System.currentTimeMillis();

            minTime = (long)(1000.0f / getCurrentRate());
            timeLeft = lastSchedAction + minTime - curTime;

            long timeSpent = curTime - lastSchedAction + timeLeft;
            avgSpent = (avgSpent + timeSpent) / 2;

            if(timeLeft <= 0) {
                lastSchedAction = curTime;
                return;
            }

            lastSchedAction = curTime + timeLeft;
        }

        Thread.sleep(timeLeft);
    }

    public synchronized float getCuRate(){
        return (float) ( 1000d / avgSpent);
    }
}

And unit tests

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class RequestRateLimiterTest {


    @Test(expected = IllegalArgumentException.class)
    public void checkSingleThreadZeroRate(){

        // Zero rate
        RequestRateLimiter limiter = RequestRateLimiter.create(0);
        try {
            limiter.consume();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void checkSingleThreadUnlimitedRate(){

        // Unlimited
        RequestRateLimiter limiter = RequestRateLimiter.create(Float.MAX_VALUE);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 1000; i++ ){

            try {
                limiter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( ((ended - started) < 1000));
    }

    @Test
    public void rcheckSingleThreadRate(){

        // 3 request per minute
        RequestRateLimiter limiter = RequestRateLimiter.create(3f/60f);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 3; i++ ){

            try {
                limiter.consume();
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();

        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( ((ended - started) >= 60000 ) & ((ended - started) < 61000));
    }



    @Test
    public void checkSingleThreadRateLimit(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 1000; i++ ){

            try {
                limiter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();

        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ));
    }

    @Test
    public void checkMultiThreadedRateLimit(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(10);
        ExecutorService exec = Executors.newFixedThreadPool(10);

        for ( int i = 0; i < 10; i++ ) {

            tasks.add( exec.submit(() -> {
                for (int i1 = 0; i1 < 100; i1++) {

                    try {
                        limiter.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

    @Test
    public void checkMultiThreaded32RateLimit(){

        // 0,2 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(0.2f);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(8);
        ExecutorService exec = Executors.newFixedThreadPool(8);

        for ( int i = 0; i < 8; i++ ) {

            tasks.add( exec.submit(() -> {
                for (int i1 = 0; i1 < 2; i1++) {

                    try {
                        limiter.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

    @Test
    public void checkMultiThreadedRateLimitDynamicRate(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(10);
        ExecutorService exec = Executors.newFixedThreadPool(10);

        for ( int i = 0; i < 10; i++ ) {

            tasks.add( exec.submit(() -> {

                Random r = new Random();
                for (int i1 = 0; i1 < 100; i1++) {

                    try {
                        limiter.consume();
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

}
  • The code is pretty simple. You just create the limiter with maxRate or with periods and rate. And then just call consume every request. Whenever the rate is not exceeded, the limiter returns immediately or waits some time before return to lower current request rate. It also has current rate method which returns sliding average of the current rate. – Leonid Astakhov Dec 13 '18 at 09:03
0

My solution: A simple util method, you can modify it to create a wrapper class.

public static Runnable throttle (Runnable realRunner, long delay) {
    Runnable throttleRunner = new Runnable() {
        // whether is waiting to run
        private boolean _isWaiting = false;
        // target time to run realRunner
        private long _timeToRun;
        // specified delay time to wait
        private long _delay = delay;
        // Runnable that has the real task to run
        private Runnable _realRunner = realRunner;
        @Override
        public void run() {
            // current time
            long now;
            synchronized (this) {
                // another thread is waiting, skip
                if (_isWaiting) return;
                now = System.currentTimeMillis();
                // update time to run
                // do not update it each time since
                // you do not want to postpone it unlimited
                _timeToRun = now+_delay;
                // set waiting status
                _isWaiting = true;
            }
            try {
                Thread.sleep(_timeToRun-now);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // clear waiting status before run
                _isWaiting = false;
                // do the real task
                _realRunner.run();
            }
        }};
    return throttleRunner;
}

Take from JAVA Thread Debounce and Throttle

benbai123
  • 1,423
  • 1
  • 11
  • 11
0

Here is a rate limiter implementation based on @tonywl (and somewhat relates to Duarte Meneses's leaky bucket). The idea is the same - use a "token pool" to allow both rate limiting and bursting (make multiple calls in a short time after idling for a bit).

This implementation offers two main differences:

  1. Lock-less concurrent access using atomic operations.
  2. Instead of blocking a request, calculate a delay needed to enforce the rate limit and offers that as the response, allow the caller to enforce the delay - this will work better with asynchronous programming that you can find in modern networking frameworks.

The full implementation with documentation can be found in this Github Gist, which is where I'll also post updates, but here's the gist of it:

import java.util.concurrent.atomic.AtomicLong;

public class RateLimiter {
    private final static long TOKEN_SIZE = 1_000_000 /* tockins per token */;
    private final double tokenRate; // measured in tokens per ms
    private final double tockinRate; // measured in tockins per ms
    private final long tockinsLimit;
    
    private AtomicLong available;
    private AtomicLong lastTimeStamp;

    /**
     * Create a new rate limiter with the token fill rate specified as
     * {@code fill}/{@code interval} and a maximum token pool size of {@code limit}, starting
     * with a {@code prefill} amount of tokens ready to be used.
     * @param prefill instead of starting with an empty pool, assume we "start from rest" and
     *                have tokens to consume. This value is clamped to {@code limit}.
     * @param limit The maximum number of tokens in the pool (burst size)
     * @param fill How many tokens will be filled in the pool by waiting {@code interval} time
     * @param interval How long will it take to get {@code fill} tokens back in the pool in ms
     */
    public RateLimiter(int prefill, int limit, int fill, long interval) {
        this.tokenRate = (double)fill / interval;
        this.tockinsLimit = TOKEN_SIZE * limit;
        this.tockinRate = tokenRate * TOKEN_SIZE;
        this.lastTimeStamp = new AtomicLong(System.nanoTime());
        this.available  = new AtomicLong(Math.max(prefill, limit) * TOKEN_SIZE);
    }

    public boolean allowRequest() {
        return whenNextAllowed(1, false) == 0;
    }

    public boolean allowRequest(int cost) {
        return whenNextAllowed(cost, false) == 0;
    }

    public long whenNextAllowed(boolean alwaysConsume) {
        return whenNextAllowed(1, alwaysConsume);
    }
    
    /**
     * Check when will the next call be allowed, according to the specified rate.
     * The value returned is in milliseconds. If the result is 0 - or if {@code alwaysConsume} was
     * specified then the RateLimiter has recorded that the call has been allowed.
     * @param cost How costly is the requested action. The base rate is 1 token per request,
     *   but the client can declare a more costly action that consumes more tokens.
     * @param alwaysConsume if set to {@code true} this method assumes that the caller will delay
     *   the action that is rate limited but will perform it without checking again - so it will
     *   consume the specified number of tokens as if the action has gone through. This means that
     *   the pool can get into a deficit, which will further delay additional actions.
     * @return how many milliseconds before this request should be let through.
     */
    public long whenNextAllowed(int cost, boolean alwaysConsume) {
        var now = System.nanoTime();
        var last = lastTimeStamp.getAndSet(now);
        // calculate how many tockins we got since last call
        // if the previous call was less than a microsecond ago, we still accumulate at least
        // one tockin, which is probably more than we should, but this is too small to matter - right?
        var add = (long)Math.ceil(tokenRate * (now - last));
        var nowAvailable = available.addAndGet(add);
        while (nowAvailable > tockinsLimit) {
            available.compareAndSet(nowAvailable, tockinsLimit);
            nowAvailable = available.get();
        }

        // answer the question
        var toWait = (long)Math.ceil(Math.max(0, (TOKEN_SIZE - nowAvailable) / tockinRate));
        if (alwaysConsume || toWait == 0) // the caller will let the request go through, so consume a token now
            available.addAndGet(-TOKEN_SIZE);
        return toWait;
    }

}
Guss
  • 30,470
  • 17
  • 104
  • 128