I think there are a couple of ways of addressing the unbounded threadpool. One is, as others point out, to create an RxJava Scheduler from an Executor backed by bounded threadpool. That is pretty straightforward and may very well be the best approach.
But, I do want to point out that RxJava's "parallelizing" operators (flatMap, concatMapEager) also have an optional maxConcurrency operator that allows us to decouple the the number of swimlanes in a given Rx pipeline from the Scheduler being used to execute it.
Here's a hypothetical example, let's say we have a Data Access Object that performs blocking queries. In this case it just sleeps for 1 second and returns the query itself with a timestamp appended:
public class MyDao
{
public Object blockingGetData( String query ) throws InterruptedException
{
Thread.sleep( 1000 );
return query.toUpperCase() + " - " + new Date().toString();
}
}
Next, let's wrap the DAO in an async Service that maintains an Rx pipeline where each element represents a query and its async result:
public class MyService
{
private class QueryHolder
{
final String query;
final Subject<Object> result;
public QueryHolder( String query, Subject<Object> result )
{
this.query = query;
this.result = result;
}
}
private static final int MAX_CONCURRENCY = 2;
private final Subject<QueryHolder> querySubject;
private final MyDao dao;
public MyService()
{
dao = new MyDao();
querySubject = PublishSubject.<QueryHolder>create().toSerialized();
querySubject
.flatMap(
// For each element in the pipeline, perform blocking
// get on IO Scheduler, populating the result Subject:
queryHolder -> Observable.just( queryHolder )
.subscribeOn( Schedulers.io() )
.doOnNext( __ -> {
Object data = dao.blockingGetData( queryHolder.query );
queryHolder.result.onNext( data );
queryHolder.result.onComplete();
} ),
// With max concurrency limited:
MAX_CONCURRENCY )
.subscribe();
}
public Single<Object> getData( String query )
{
Subject<Object> result = AsyncSubject.create();
// Emit pipeline element:
querySubject.onNext( new QueryHolder( query, result ));
return result.firstOrError();
}
}
I recommend you google the different subject types and operators, etc. - there's tons of documentation available.
A simple manual test:
@Test
public void testService() throws InterruptedException
{
MyService service = new MyService();
// Issue 20 queries immediately, printing the results when they complete:
for ( int i = 0; i < 20; i++ )
{
service.getData( "query #" + i )
.subscribe( System.out::println );
}
// Sleep:
Thread.sleep( 11000 );
}
Output:
QUERY #0 - Wed Mar 11 11:08:21 EDT 2020
QUERY #1 - Wed Mar 11 11:08:21 EDT 2020
QUERY #2 - Wed Mar 11 11:08:22 EDT 2020
QUERY #3 - Wed Mar 11 11:08:22 EDT 2020
QUERY #4 - Wed Mar 11 11:08:23 EDT 2020
QUERY #5 - Wed Mar 11 11:08:23 EDT 2020
QUERY #6 - Wed Mar 11 11:08:24 EDT 2020
QUERY #7 - Wed Mar 11 11:08:24 EDT 2020
QUERY #8 - Wed Mar 11 11:08:25 EDT 2020
QUERY #9 - Wed Mar 11 11:08:25 EDT 2020
QUERY #10 - Wed Mar 11 11:08:26 EDT 2020
QUERY #11 - Wed Mar 11 11:08:26 EDT 2020
QUERY #12 - Wed Mar 11 11:08:27 EDT 2020
QUERY #13 - Wed Mar 11 11:08:27 EDT 2020
QUERY #14 - Wed Mar 11 11:08:28 EDT 2020
QUERY #15 - Wed Mar 11 11:08:28 EDT 2020
QUERY #16 - Wed Mar 11 11:08:29 EDT 2020
QUERY #17 - Wed Mar 11 11:08:29 EDT 2020
QUERY #18 - Wed Mar 11 11:08:30 EDT 2020
QUERY #19 - Wed Mar 11 11:08:30 EDT 2020