0

I am using RxJava to asynchronously handle servlet requests. During each request, a number of async calls to a remote service API are made using the flatMap operator.

Due to resource constraints, I need to limit the total number of concurrent requests against that API. It would be trivial for a single Rx stream that calls the API in a single flatMap, using its concurrency parameter. But I have multiple independent streams in my application (basically one for each ServletRequest) and each stream has multiple flatMap calls to the API.

So I think I would have to funnel all my remote requests into a singleton stream that performs the actual calls and can then easily limit the concurrency. But then it seems not trivial to get the API responses back into the original stream. Additionally it seems complicated to maintain backpressure across such a construct.

The other option would be to use a traditional semaphore, but I'm not sure if its blocking behaviour is a good fit with Rx.

So is there an established pattern to implement something like this? Or am I missing a clever combination of operators that avoids these complications altogether?

Malt
  • 28,965
  • 9
  • 65
  • 105
tobiasH
  • 411
  • 5
  • 11

2 Answers2

0

In RxJava you can create your own Schedulers from regular Java executors:

ExecutorService exec= Executors.newFixedThreadPool(2); //2 Fixed threads
Schedulers.from(exec);

So just create an executor with a limited number of threads for each of your resources, and use that specific scheduler whenever accessing the resource. The limited number of threads will limit the number of concurrent calls.

EDIT:

Apparently I misunderstood the question. If the calls are asynchronous, you can try and use Rx's backpressure to manage them. Here's an idea on how to manage such calls using Rx:

You create a "Resource Permit Observable" that emits something (some sort of token) whenever the API can be called. Its rate of token (permit) creation will be the maximum rate of usage of that API. Whenever some observable needs to call the API, just zip the call with a permit observable. The Zip operator will block until a permit is available, limiting the API calls to rate of Permit generation

Here's a trivial implementation of a permit observable that emits timestamps:

public class PermitObservable extends Observable<Long> {

    private final long msBetweenEmissions;

    public PermitObservable(long msBetweenEmissions) {
        super(new SyncOnSubscribe<Long, Long>() {
            @Override
            protected Long generateState() {
                return System.currentTimeMillis();
            }

            @Override
            protected Long next(Long state, Observer<? super Long> observer) {
                long nextEmissionAt = state + msBetweenEmissions;
                long timeToWait = nextEmissionAt - System.currentTimeMillis();
                if (timeToWait > 0) {
                    try {
                        Thread.sleep(timeToWait);
                    } catch (InterruptedException e) {
                        observer.onError(e);
                    }
                }
                long now = System.currentTimeMillis();
                observer.onNext(Long.valueOf(now)); // Permit emission
                return now;
            }
        });

        this.msBetweenEmissions = msBetweenEmissions;
    }
}
Malt
  • 28,965
  • 9
  • 65
  • 105
  • I thought about that too. But the remote requests are asynchronous, if I understand schedulers correctly, I would then only limit the number of concurrent invocations of the async service, not the number of pending requests, right? So I would need to make the requests blocking, so that the thread isn't freed until I get a response from the remote API? – tobiasH Feb 06 '16 at 15:19
  • That depends on the Java executor. You can use an executor that blocks when there are more than a certain number of pending requests (see: http://stackoverflow.com/a/4522411/3199595) – Malt Feb 06 '16 at 15:27
  • I'm not sure if we're talking about the same thing. By pending I meant pending with the remote service. My API requests are non-blocking, so I think with your solution I would only submit two requests in paralell to the remote service, but since the threads aren't blocked until the responses are received, it would still saturate the (remote) queue, right? I'm not sure how the queue implementation could help with that. – tobiasH Feb 06 '16 at 16:16
  • @tobiasH Then I misunderstood. If the API itself is async, how do you normally keep track of the number of outstanding requests? – Malt Feb 06 '16 at 21:25
  • It's just HTTP calls, I use Retrofit to make requests asynchronous. I could easily make the calls blocking and use your solution and possibly that's the way to go here, but I wonder if there is an Rx/async solution. – tobiasH Feb 06 '16 at 21:36
  • @tobiasH I've sketched out another possible solution in the answer. See if you like it. – Malt Feb 06 '16 at 22:13
  • Thank you! Using zip to do the rate limiting and the general pattern you gave is a great help. But since I have a global rate limit, I would need some kind of scheduler service that creates these token streams and distributes released tokens among these streams, right? – tobiasH Feb 08 '16 at 07:42
0

Check out Scheduler.when(). It allows the user to build a modified scheduler from an existing scheduler rather than creating new thread pools. Like use the Schedulers.io() but limit the number of active threads. Or rate limit the number of onNexts on Schedulers.computation().