2

I have a very specific application architecture question.

I need to resolve a large amount of incoming destinations, the resolution of these objects is handled asynchronously and when complete the objects need to be passed to the next phase.

So the real question boils down to, what is an elegant way of coping with a large amount of Future objects.

Should I just throw these into a list, iterate over that list and remove them when done? Due to the constant concurrent access the list will most likely be livelocked and become a large bottleneck.

Sorry if my question is phrased rather vaguely.

  • 3
    You can use an ExecutorCompletionService. See for example: http://stackoverflow.com/a/11578512/829571 – assylias Feb 13 '14 at 18:44
  • Could you use a blocking queue, or something similar that's designed from the ground up to work concurrently? If you need the entire set before you pass it to the next phase, I suppose it doesn't really matter if it's a bottleneck; it is what it is. – Robert Harvey Feb 13 '14 at 18:45

4 Answers4

2

If you need something to be done after the task is complete, I would add that work to the task. i.e. instead of adding a task, add a Runnable which performs both the task and what you need to do with the result. This way the processing of the result is timely and concurrent.

Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • The reason for separating these phases is because both address a different resource and because I can't afford to wait for destination resolution inside each thread, since phase 1 and 2 can happen asynchronously but phase 2 does depend on 1 to have happened. – user3002369 Feb 13 '14 at 21:45
  • @user3002369 in that case, 1 can trigger an asynchronous request for 2 on the completion of 1. – Peter Lawrey Feb 14 '14 at 11:17
2

CompletableFuture from Java8 has methods to organize task chains. In particular,

allOf(CompletableFuture<?>... cfs)
Returns a new CompletableFuture that is completed when all of the
given CompletableFutures complete.

So, collect all CompletableFuture's of the first stage in an array cfs, and call

CompletableFuture.allOf(cfs).thenRunAsync(nextPhase)
Alexei Kaigorodov
  • 13,189
  • 1
  • 21
  • 38
1

Have a look at this post: https://stackoverflow.com/a/24363156/1248724

Since you want to wait for all task to finish till you can do the next phase with their results you are looking for a CountDownLatch.

Here an example using an ExecutorService:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

and within your task (enclose in try / finally)

latch.countDown();
Community
  • 1
  • 1
Zarathustra
  • 2,853
  • 4
  • 33
  • 62
0

Have a look at RXJava

https://github.com/Netflix/RxJava/wiki

It provides methods to easily handle (transform, filter, combine...) multiple asynchronous tasks.

Ruben
  • 3,986
  • 1
  • 21
  • 34