0

I've got a code that looks similar to this:

List<String> ids = expensiveMethod();
List<String> filteredIds = cheapFilterMethod(ids);

if (!filteredIds.isEmpty()) {
    List<SomeEntity> fullEntities = expensiveDatabaseCall(filteredIds);
    List<SomeEntity> filteredFullEntities = anotherCheapFilterFunction(fullEntities);
    if (!filteredFullEntities.isEmpty()) {
        List<AnotherEntity> finalResults = stupidlyExpensiveDatabaseCall(filteredFullEntities);
        relativelyCheapMethod(finalResults);
    }
}

It's basically a waterfall of a couple expensive methods that, on their own, all either grab something from a database or filter previous database results. This is due to stupidlyExpensiveDatabaseCall, which needs as few leftover entities as possible, hence the exhaustive filtering.

My problem is that the other functions aren't all quite cheap either and thus they block the thread for a couple of seconds while stupidlyExpensiveDatabaseCall is waiting and doing nothing until it gets the whole batch at once.

I'd like to process the results from each method as they come in. I know I could write a thread for each individual method and have some concurrent queue working between them, but that's a load of boilerplate that I'd like to avoid. Is there a more elegant solution?

Selbi
  • 813
  • 7
  • 23
  • 2
    Is there a way that you can share more information? Why are the database operations so expensive, have you looked at the execution plan? – Glains Oct 27 '19 at 20:53

3 Answers3

3

There's a post about different ways to parallelize, not only the parallelStream() way, but also that consecutive steps run in parallel the way you described, linked by queues. RxJava may suit your need in this respect. Its a more complete variety of the rather fragmentary reactive streams API in java9. But I think, you're only really there if you use a reactive db api along with it.

That's the RxJava way:

public class FlowStream {

@Test
public void flowStream() {
    int items = 10;

    print("\nflow");
    Flowable.range(0, items)
            .map(this::expensiveCall)
            .map(this::expensiveCall)
            .forEach(i -> print("flowed %d", i));

    print("\nparallel flow");
    Flowable.range(0, items)
            .flatMap(v ->
                    Flowable.just(v)
                            .subscribeOn(Schedulers.computation())
                            .map(this::expensiveCall)
            )
            .flatMap(v ->
                    Flowable.just(v)
                            .subscribeOn(Schedulers.computation())
                            .map(this::expensiveCall)
            ).forEach(i -> print("flowed parallel %d", i));

    await(5000);

}

private Integer expensiveCall(Integer i) {
    print("making %d more expensive", i);
    await(Math.round(10f / (Math.abs(i) + 1)) * 50);
    return i;
}

private void await(int i) {
    try {
        Thread.sleep(i);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

private void print(String pattern, Object... values) {
    System.out.println(String.format(pattern, values));
}

}

The maven repo:

   <!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava -->
    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.2.13</version>
    </dependency>
Curiosa Globunznik
  • 3,129
  • 1
  • 16
  • 24
  • That looks hella dope! I'm gonna give it a shot. – Selbi Oct 27 '19 at 22:14
  • 1
    Did you eventually take that route, @Selbi? – Curiosa Globunznik Nov 28 '19 at 20:14
  • Yes and no. Yes, because it did the passing-through I was hoping to achieve, but also no because as it turned out, it was a vain attempt anyway. The "stupidly expensive database calls" would eat away each other's resources (since they access the same DB). I've time-measured it and gained maybe one or two seconds, and it was just prototyped for one specific use case to see how flowables work. So I reverted it and now stick to what I showed in the OP. It was still a super dope thing to learn for future use cases where I hopefully got more control from the ground-up, so thanks for that! – Selbi Nov 30 '19 at 21:16
0

You could use CompleteableFuture to divide up each non-CPU-bound step. The usage is similar to the javascript promise API.

public void loadEntities() {
    CompletableFuture.supplyAsync(this::expensiveMethod, Executors.newCachedThreadPool())
            .thenApply(this::cheapFilterMethod)
            .thenApplyAsync(this::expensiveDatabaseCall)
            .thenApply(this::anotherCheapFilterFunction)
            .thenApplyAsync(this::stupidlyExpensiveDatabaseCall)
            .thenAccept(this::relativelyCheapMethod);
}

private List<String> expensiveMethod() { ... }
private List<String> cheapFilterMethod(List<String> ids) { ... }
private List<SomeEntity> expensiveDatabaseCall(List<String> ids) { ... }
private List<SomeEntity> anotherCheapFilterFunction(List<SomeEntity> entities) { ... }
private List<AnotherEntity> stupidlyExpensiveDatabaseCall(List<SomeEntity> entities) { ... }
private void relativelyCheapMethod(List<AnotherEntity> entities) { ... }

You can also pass your own thread pool at each step if you'd like to have more control over execution.

kingkupps
  • 3,284
  • 2
  • 16
  • 28
  • I think, the thenApply-chain is sequential, isn't it, similar to the proposal of @George? – Curiosa Globunznik Oct 27 '19 at 21:31
  • loadEntities will not block the calling thread. The each call in the chain will happen sequentially, but unless the API is changed, there is no way around this since certain calls require the result of previous calls. Am I misunderstanding the ask? – kingkupps Oct 27 '19 at 22:27
  • Looks like the OP wants to [process a stream of results](https://stackoverflow.com/questions/58583102/passing-results-from-expensive-methods-as-they-come-for-multiple-layers#comment103481890_58583320). Processing occurs as discrete chunks of data are emitted by the source over time. I believe @curiosa is correct and something like _RxJava_ is the best solution for the OP. – Slaw Oct 27 '19 at 22:57
0

You can use Java 8 Stream API. It's impossible to process a DB query "as they come in" because the result set will come in all at once. You'd have to change your method to handle single entities.

expensiveMethod().parallelStream()
  .filter(this::cheapFilterMethod)                // Returns Boolean
  .map(this::expensiveDatabaseCallSingle)         // Returns SomeEntity
  .filter(this::anotherCheapFilterFunction)       // Returns boolean for filtered entities
  .map(this::stupidlyExpensiveDatabaseCallSingle) // Returns AnotherEntity
  .forEach(this::relativelyCheapMethod);          // void method

I would also suggest using an ExecutorService to manage your threads so you don't consume all resources just creating a bunch of threads:

ExecutorService threadPool = Executors.newFixedThreadPool(8);
threadPool.submit(this::methodForParallelStream);
George
  • 2,820
  • 4
  • 29
  • 56
  • If I'm not mistaken, the result of expensiveMethod() will be partitioned and each partition processed in a separate thread, but the filter/map/filter/map... chain is sequential, as opposed to the request of the OP. – Curiosa Globunznik Oct 27 '19 at 21:52
  • It's actually a paged API call, returning up to N items per request and giving a pointer for the next partition. My goal would be to have each partition response be directly fed into the remaining filter chain. – Selbi Oct 27 '19 at 22:11