2

I am using an external library that has the code from below. I am sending a lot of commands and am interesed in the result for statistics to check how many calls failed and how many succeeded

public Future<CommandResult> sendCommand(Command command) {
    return command.execute();
}
CommandResult can be success or failure

However, if I use client.sendCommand(command).get(); then, I am waiting for the result synchronously, meanwhile the app is being blocked.

I would like to check only later (after 30 seconds which calls succeded and which failed). I am guaranteed to get an answer in 10 seconds. The problem is that the app waits for the computation to complete, and then retrieves its result.

I was thinking about this approach based on the answers:

List<Future< CommandResult >> futures = new ArrayList<>();
for(Command command: commands) {
   futures.add(client.sendCommand(command)); 
} 

//in a scheduler, 30+ seconds later 
for (Future<Boolean> future : futures) {  
   saveResult(future.get());
} 
Sorin Penteleiciuc
  • 653
  • 1
  • 10
  • 26
  • Probably you want to use a notification mechanism, that will invoke something when the command result is ready. For this you should use a different thread. – pringi Feb 11 '22 at 12:13
  • How exactly would you approach this ? – Sorin Penteleiciuc Feb 11 '22 at 12:19
  • Why not create a new thread that waits for the results, and then does whatever it is you want to do when the results become available? – Solomon Slow Feb 11 '22 at 13:24
  • Because I still have the problem when doing .get, the execution is blocked – Sorin Penteleiciuc Feb 11 '22 at 13:26
  • But only the new thread would be blocked. That would be the whole point of creating the new thread: It could do the waiting, while some other thread(s) are free to do whatever it is that they do. Maybe you could explain in more detail; (a) what you want to have happen when the results are ready, and (b) what else is happening that you don't want to be blocked while awaiting the results. – Solomon Slow Feb 11 '22 at 13:40
  • I do not really want to be notified immedialty once a result is available. I send really a lot of commands and my goal is to be able to continue execution without being blocked. – Sorin Penteleiciuc Feb 11 '22 at 14:03
  • 2
    Then just continue execution and call `get` when 30 seconds have been elapsed. We don’t know what you want to do in the meanwhile, so we can’t tell you how to do it. – Holger Feb 11 '22 at 15:12

3 Answers3

1

Future is a legacy java feature which does not allow for reactive non blocking functionalities. The CompletableFuture is a later enhancement in Java in order to allow such reactive non blocking functionalities.

You can based on this previous SO answer try to convert your Future into a CompletableFuture and then you will have methods exposed to take advantage of non blocking execution.

Check the following example and modify accordingly.

public class Application {

    public static void main(String[] args) throws ParseException {

        Future future =  new SquareCalculator().calculate(10);
        CompletableFuture<Integer> completableFuture = makeCompletableFuture(future);
        System.out.println("before apply");
        completableFuture.thenApply(s -> {
            System.out.println(s);
            return s;
        });
        System.out.println("after apply method");
    }


    public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
        if (future.isDone())
            return transformDoneFuture(future);
        return CompletableFuture.supplyAsync(() -> {
            try {
                if (!future.isDone())
                    awaitFutureIsDoneInForkJoinPool(future);
                return future.get();
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                // Normally, this should never happen inside ForkJoinPool
                Thread.currentThread().interrupt();
                // Add the following statement if the future doesn't have side effects
                // future.cancel(true);
                throw new RuntimeException(e);
            }
        });
    }

    private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
        CompletableFuture<T> cf = new CompletableFuture<>();
        T result;
        try {
            result = future.get();
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
            return cf;
        }
        cf.complete(result);
        return cf;
    }

    private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
            throws InterruptedException {
        ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
            @Override public boolean block() throws InterruptedException {
                try {
                    future.get();
                } catch (ExecutionException e) {
                    throw new RuntimeException(e);
                }
                return true;
            }
            @Override public boolean isReleasable() {
                return future.isDone();
            }
        });
    }
}

And then the class to create an example Future

public class SquareCalculator {

    private ExecutorService executor
            = Executors.newSingleThreadExecutor();

    public Future<Integer> calculate(Integer input) {
        return executor.submit(() -> {
            Thread.sleep(1000);
            return input * input;
        });
    }
}

Will result into

enter image description here

Panagiotis Bougioukos
  • 15,955
  • 2
  • 30
  • 47
1

I would like to check only later (after 30 seconds which calls succeeded and which failed). I am guaranteed to get an answer in 10 seconds. The problem is that the app waits for the computation to complete, and then retrieves its result.

If you want check on the results at a later time then your solution with Future<Boolean> should be fine. The jobs will run in the background and you will get the results form then when you call future.get(). Each of those get() calls do block however.

If you want to get the results as they come in, I would use an ExecutorCompletionService which you can poll anytime to see if you have results. The poll is non-blocking.

// create your thread pool using fixed or other pool
Executor<Result> threadPool = Executors.newFixedThreadPool(5);
// wrap the Executor in a CompletionService
CompletionService<Boolean> completionService =
   new ExecutorCompletionService<>(e);
// submit jobs to the pool through the ExecutorCompletionService
for (Job job : jobs) {
   completionService.submit(job);
}
// after we have submitted all of the jobs we can shutdown the Executor
// the jobs submitted will continue to run
threadPool.shutdown();
...
// some point later you can do
int jobsRunning = jobs.size();
for (int jobsRunning = jobs.size(); jobsRunning > 0; ) {
   // do some processing ...
   // are any results available?
   Boolean result = completionService.poll();
   if (result != null) {
      // process a result if available
      jobsRunning--;
   }
}

Note that you will need to track how many jobs you submitted to the CompletionService.

Gray
  • 115,027
  • 24
  • 293
  • 354
  • 1
    As I stated that the poll is non-blocking, then my idea is to check this with a scheduler 30 seconds later. I am guaranteed to get an answer in 10s. If I throw some requests then I can monitor it later. What would you say for this approach List> futures = new ArrayList<>(); for(Command command: commands) { futures.add(client.sendCommand(command)); } in a scheduler, 30 seconds later just ro proof if they finished for (Future future : futures) { logger.info("counter :" + counter + future.get()); } – Sorin Penteleiciuc Feb 12 '22 at 07:31
  • @SorinPenteleiciuc `I do not really want to be notified immedialty once a result is available. I send really a lot of commands and my goal is to be able to continue execution without being blocked. ` With my solution in the other answer you don't waste any non needed resources. You will use just 1 extra thread, which will take over all those tasks to see if response is ready, and by the time it is ready you will have the lambda executed with the result. – Panagiotis Bougioukos Feb 12 '22 at 07:56
0

If converting the Future instances to CompletableFuture (see answer from Panagiotis Bougioukos) is an option, then you can implement a simple helper function for turning a Stream<CompletableFuture<T>> into a CompletableFuture<Stream<T>>:

public static <T> CompletableFuture<Stream<T>> collect(Stream<CompletableFuture<T>> futures) {
    return futures
        .map(future -> future.thenApply(Stream::of))
        .reduce(
            CompletableFuture.completedFuture(Stream.empty()),
            (future1, future2) ->
            future1
                .thenCompose(stream1 ->
            future2
                .thenApply(stream2 ->
            concat(stream1, stream2)))
        );
}

Essentially this reduces the stream of futures in parallel to a future of a stream.

If you use this e.g. on a stream of futures of strings, it will return a future that completes once the last of the individual futures completed:

Stream<CompletableFuture<String>> streamOfFutures = ...
CompletableFuture<Stream<String>> futureOfStream = collect(streamOfFutures);

// Prints a list of strings once the "slowest" future completed
System.out.println(futureOfStream.get().toList());
michid
  • 10,536
  • 3
  • 32
  • 59