0

Want

Many threads that will make a datbase call and block in order to improve and scale performance.

Problems:

  1. The standard Java completable future API does not work well with blocking/IO tasks, even when using ManagedBlocker.
  2. If using a library that does not have this problem, too many async requests at the same time has at least 1 scaling problems:
    • Too many threads created at the same time could lead to out of memory error due to how much memory each thread needs. And there are no good default ThreadPoolExecutors that allow setting threadpool parameters such as max number of threads followed by providing a queue system for incoming tasks to wait before a thread is available.

Example

I want to scale a program that will need to make 3000 async db requests. Instead of making 3000 requests all at once, I want to limit it to 50 at any given time and queue the remaining 2950, then process each 2950 of the remaining one at a time whenever a task completes. Ideally I would like to do this using existing libraries as to re-inventing it with new custom code, as I am assuming there is a way to do this but I am unsure of how to use the APIs of various async Java SDKs that keep coming out.

Community
  • 1
  • 1
Zombies
  • 25,039
  • 43
  • 140
  • 225
  • 1
    I'm not sure I understand - you say too many async requests has a scaling problem related to too many threads being created at the same time, but the whole point of async is that a thread *isn't* created per request. All of the standard reactive libraries (Akka, Reactor, RxJava) will be able to do this, if I'm understanding correctly. – Michael Berry Mar 02 '20 at 17:12
  • This would be using rxJava/etc for IO multi-threaded performance. (EG: `Schedulars.io()`) Not for async events in a single thread. The problem is that `Schedulers.io()` creates an **unbounded threadpool** which could lead to the JVM running out of memory. – Zombies Mar 02 '20 at 17:23
  • Oh I see, it's to wrap blocking IO - I missed that detail. No expert on rxjava, but reactor has a [boundedElastic()](https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html#newBoundedElastic-int-int-java.lang.String-) scheduler which may be what you're after. Defaults to 10x the number of CPU cores, or you can specify your own bound. – Michael Berry Mar 02 '20 at 17:27
  • 1
    "Too many threads created at the same time" - it depends on the Executor chosen. You always can choose (or create, or copy from somerwhere) an Executor with limited number of threads, say, https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html – Alexei Kaigorodov Mar 02 '20 at 18:39
  • @Zombies I'm curious what about my answer is insufficient – TrogDor Mar 24 '20 at 15:25
  • @TrogDor I didn't downvote it, but I haven't been around to look at this lately. I think I solved it using this blog: http://tomstechnicalblog.blogspot.com/2015/11/rxjava-achieving-parallelization.html – Zombies Mar 26 '20 at 12:32

1 Answers1

0

I think there are a couple of ways of addressing the unbounded threadpool. One is, as others point out, to create an RxJava Scheduler from an Executor backed by bounded threadpool. That is pretty straightforward and may very well be the best approach.

But, I do want to point out that RxJava's "parallelizing" operators (flatMap, concatMapEager) also have an optional maxConcurrency operator that allows us to decouple the the number of swimlanes in a given Rx pipeline from the Scheduler being used to execute it.

Here's a hypothetical example, let's say we have a Data Access Object that performs blocking queries. In this case it just sleeps for 1 second and returns the query itself with a timestamp appended:

public class MyDao
{
    public Object blockingGetData( String query ) throws InterruptedException
    {
        Thread.sleep( 1000 );
        return query.toUpperCase() + " - " + new Date().toString();
    }
}

Next, let's wrap the DAO in an async Service that maintains an Rx pipeline where each element represents a query and its async result:

public class MyService
{
    private class QueryHolder
    {
        final String query;
        final Subject<Object> result;

        public QueryHolder( String query, Subject<Object> result )
        {
            this.query = query;
            this.result = result;
        }
    }

    private static final int MAX_CONCURRENCY = 2;
    private final Subject<QueryHolder> querySubject;
    private final MyDao dao;

    public MyService()
    {
        dao = new MyDao();
        querySubject = PublishSubject.<QueryHolder>create().toSerialized();

        querySubject
            .flatMap(
                    // For each element in the pipeline, perform blocking
                    // get on IO Scheduler, populating the result Subject:
                    queryHolder -> Observable.just( queryHolder )
                        .subscribeOn( Schedulers.io() )
                        .doOnNext( __ -> {
                            Object data = dao.blockingGetData( queryHolder.query );
                            queryHolder.result.onNext( data );
                            queryHolder.result.onComplete();
                        } ),
                    // With max concurrency limited:
                    MAX_CONCURRENCY )
            .subscribe();
    }

    public Single<Object> getData( String query )
    {
        Subject<Object> result = AsyncSubject.create();

        // Emit pipeline element:
        querySubject.onNext( new QueryHolder( query, result ));

        return result.firstOrError();
    }
}

I recommend you google the different subject types and operators, etc. - there's tons of documentation available.

A simple manual test:

@Test
public void testService() throws InterruptedException
{
    MyService service = new MyService();

    // Issue 20 queries immediately, printing the results when they complete:
    for ( int i = 0; i < 20; i++ )
    {
        service.getData( "query #" + i )
            .subscribe( System.out::println );
    }

    // Sleep:
    Thread.sleep( 11000 );
}

Output:

QUERY #0 - Wed Mar 11 11:08:21 EDT 2020
QUERY #1 - Wed Mar 11 11:08:21 EDT 2020
QUERY #2 - Wed Mar 11 11:08:22 EDT 2020
QUERY #3 - Wed Mar 11 11:08:22 EDT 2020
QUERY #4 - Wed Mar 11 11:08:23 EDT 2020
QUERY #5 - Wed Mar 11 11:08:23 EDT 2020
QUERY #6 - Wed Mar 11 11:08:24 EDT 2020
QUERY #7 - Wed Mar 11 11:08:24 EDT 2020
QUERY #8 - Wed Mar 11 11:08:25 EDT 2020
QUERY #9 - Wed Mar 11 11:08:25 EDT 2020
QUERY #10 - Wed Mar 11 11:08:26 EDT 2020
QUERY #11 - Wed Mar 11 11:08:26 EDT 2020
QUERY #12 - Wed Mar 11 11:08:27 EDT 2020
QUERY #13 - Wed Mar 11 11:08:27 EDT 2020
QUERY #14 - Wed Mar 11 11:08:28 EDT 2020
QUERY #15 - Wed Mar 11 11:08:28 EDT 2020
QUERY #16 - Wed Mar 11 11:08:29 EDT 2020
QUERY #17 - Wed Mar 11 11:08:29 EDT 2020
QUERY #18 - Wed Mar 11 11:08:30 EDT 2020
QUERY #19 - Wed Mar 11 11:08:30 EDT 2020
TrogDor
  • 984
  • 6
  • 14
  • if we talk about async approach then I could not imagine how `Thread.sleep()` could be applied here – Serhii Povísenko Mar 11 '20 at 17:44
  • 1
    @povisenko, as I understand the OP's question, it's about wrapping a blocking DB query with an async facade while limiting the number of underlying threads that get spawned. The Thread.sleep() here is just to simulate a long running blocking query. – TrogDor Mar 11 '20 at 18:01