2

I'm looking for a smart way of implementing a rate limit in an HTTP client. Let's assume the rate limit on the API is 5 requests per second on any of the resources. Right now the implementation looks similar to this:

final class HTTPClient: HTTPClientProtocol {

    func getUser() -> Observable<User> {
        return Observable<User>.create { (observer) -> Disposable in
            ...
        }
    }

    func getProfile() -> Observable<Profile> {
        return Observable<Profile>.create { (observer) -> Disposable in
            ...
        }
    }

    func getMessages() -> Observable<Messages> {
        return Observable<Messages>.create { (observer) -> Disposable in
            ...
        }
    }

    func getFriends() -> Observable<Friends> {
        return Observable<Friends>.create { (observer) -> Disposable in
            ...
        }
    }

}

Now ideally I would like to use these methods as needed throughout the application without worrying about the rate limit at all.

Back to the example of 5 requests per second: The first five requests can be executed immediately. But all requests after that have to wait. So within a window of 1 second 5 requests can be executed at most. All other requests have to wait.

Is there any smart way of doing this in RxSwift?

plu
  • 1,321
  • 10
  • 14
  • Possible duplicate of [Limiting concurrent access to a service class with RxSwift](http://stackoverflow.com/questions/43794392/limiting-concurrent-access-to-a-service-class-with-rxswift) – Daniel T. May 14 '17 at 13:19
  • Both linked questions do not show how to handle "2 requests per second". – plu May 15 '17 at 05:12
  • Ah... The linked answer is partially correct. You will need s dedicated scheduler for the service, but it will have to be a custom scheduler... I haven't done custom schedulers yet, I'll have to think about this one. – Daniel T. May 15 '17 at 15:16

2 Answers2

2

You need a custom Scheduler.

final class DelayScheduler: ImmediateSchedulerType {

    init(delay: TimeInterval, queue: DispatchQueue = .main) {
        self.queue = queue
        dispatchDelay = delay
    }

    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()
        lastDispatch = max(lastDispatch + dispatchDelay, .now())
        queue.asyncAfter(deadline: lastDispatch) {
            guard cancel.isDisposed == false else { return }
            cancel.setDisposable(action(state))
        }
        return cancel
    }

    var lastDispatch: DispatchTime = .now()
    let queue: DispatchQueue
    let dispatchDelay: TimeInterval
}

Then you implement your Service by having all its Observables subscribe on this scheduler:

final class HTTPClient: HTTPClientProtocol {

    func getUser() -> Observable<User> {
        return Observable<User>.create { (observer) -> Disposable in
            ...
        }.subscribeOn(scheduler)
    }

    func getProfile() -> Observable<Profile> {
        return Observable<Profile>.create { (observer) -> Disposable in
            ...
        }.subscribeOn(scheduler)
    }

    func getMessages() -> Observable<Messages> {
        return Observable<Messages>.create { (observer) -> Disposable in
            ...
        }.subscribeOn(scheduler)
    }

    func getFriends() -> Observable<Friends> {
        return Observable<Friends>.create { (observer) -> Disposable in
            ...
        }.subscribeOn(scheduler)
    }

    let scheduler = DelayScheduler(delay: 0.5)
}
Daniel T.
  • 32,821
  • 6
  • 50
  • 72
  • Thank you for the code. Unfortunately this is not exactly what I'm looking for. Your solution always gives the n+1 request a delay, no matter if that is necessary or not. In a real world scenario you have a rate limit of 5 requests per second. So you can execute 5 of them immediately, but for request number 6 you need to wait a little. – plu May 16 '17 at 06:53
1

Daniel T's use of a custom scheduler is brilliant, and I'm finding that it works well in practice. Here's a version of his code that implements a true sliding-window rate limit:

final class RateLimitedScheduler: ImmediateSchedulerType {

    let period: TimeInterval
    let queue: DispatchQueue

    var dispatchHistory: [DispatchTime]
    var dhIndex = 0

    init(maxEvents: Int, period: TimeInterval, queue: DispatchQueue = .main) {
        self.period = period
        self.queue = queue
        let periodAgo = DispatchTime.now() - period
        dispatchHistory = Array(repeating: periodAgo, count: maxEvents)
    }

    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()
        queue.asyncAfter(deadline: nextDeadline()) {
            guard cancel.isDisposed == false else { return }
            cancel.setDisposable(action(state))
        }
        return cancel
    }

    private func nextDeadline() -> DispatchTime {
        let windowStartTime = dispatchHistory[dhIndex]
        let deadline = max(windowStartTime + period, DispatchTime.now())
        dispatchHistory[dhIndex] = deadline
        dhIndex = (dhIndex >= dispatchHistory.count - 1) ? 0 : (dhIndex + 1)
        return deadline
    }

}

Note that perfect accuracy requires tracking the dispatch times of the previous N entries, so it's memory-expensive for rates of hundreds or thousands of operations per period. Consider using a "token bucket" for those cases - it's less accurate but requires only constant state (see this thread).

GSnyder
  • 470
  • 4
  • 10