4

I have Callable object executed using ExecutorService.

How to return interim results from this callable?

I know there is javax.swing.SwingWorker#publish(results) for Swing but I don't use Swing.

tshepang
  • 12,111
  • 21
  • 91
  • 136
Andrei Botalov
  • 20,686
  • 11
  • 89
  • 123
  • 4
    similar question here: http://stackoverflow.com/questions/2275816/ability-to-get-the-progress-on-a-futuret-object and here: http://stackoverflow.com/questions/6346927/java-callable-futuretask-excecuter-how-to-listen-to-finished-task – Jim Jan 20 '12 at 21:00
  • +1'ing this question as I think it's worth asking . am looking for something similar .. it appears as the java concurrency API only considers tasks as either not done yet or done. ie Future .get() blocks until task is done .. what's needed IMHO is a solution where task can have many states ( e.g. Downloading , downloaded , persisting , completed ...etc ) and an api capable of interrogating the task on its state. – diarmuid Jun 16 '14 at 20:21

5 Answers5

3

There are a couple of ways of doing this. You could do it with a callback or you could do it with a queue.

Here's an example of doing it with a callback:

public static interface Callback<T> {
    public void on(T event);
}

Then, an implementation of the callback that does something with your in progress events:

final Callback<String> callback = new Callback<String>() {
    public void on(String event) {
        System.out.println(event);
    }
};

Now you can use the callback in your pool:

Future<String> submit = pool.submit(new Callable<String>() {
    public String call() throws Exception {
        for(int i = 0; i < 10; i++) {
            callback.on("process " + i);
        }
        return "done";
    }
});
Jamie McCrindle
  • 9,114
  • 6
  • 43
  • 48
1

You can pass, let's say, an AtomicInteger to your class (the one that will be submitted by the executor) inside that class you increment it's value and from the calling thread you check it's value

Something like this:

public class LongComputation {

private AtomicInteger progress = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException,
        ExecutionException {

    AtomicInteger progress = new AtomicInteger(0);
    LongComputation computation = new LongComputation(progress);

    ExecutorService executor = Executors.newFixedThreadPool(2);
    Future<Integer> result = executor.submit(() -> computation.compute());
    executor.shutdown();

    while (!result.isDone()) {
        System.out.printf("Progress...%d%%%n", progress.intValue());
        TimeUnit.MILLISECONDS.sleep(100);
    }

    System.out.printf("Result=%d%n", result.get());
}

public LongComputation(AtomicInteger progress) {
    this.progress = progress;
}

public int compute() throws InterruptedException {

    for (int i = 0; i < 100; i++) {
        TimeUnit.MILLISECONDS.sleep(100);
        progress.incrementAndGet();
    }

    return 1_000_000;
}

}

1

It is not clear what an "interim result" really is. The interfaces used in the concurrency package simply do not define this, but assume methods that resemble more or less pure functions.

Hence, instead this:

interim     = compute something
finalresult = compute something else

do something like this:

interim = compute something
final1 = new Pair( interim, fork(new Future() { compute something else }) )

(Pseudocode, thought to convey the idea, not compileable code)


EDIT The idea is: instead of running a single monolithic block of computations (that happens to reach a state where some "interim results" are available) break it up so that the first task returns the former "interim" result and, at the same time, forks a second task that computes the final result. Of course, a handle to this task must be delivered to the caller so that it eventually can get the final result. Usually, this is done with the Future interface.

Ingo
  • 36,037
  • 5
  • 53
  • 100
  • I understood you as: to retrieve n interim results, submit n callables, thus creating n threads, and retieve results using `Future#get()`.Am I right? – Andrei Botalov Jan 20 '12 at 22:12
0

What you're looking for is java.util.concurrent.Future.

A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future and return null as a result of the underlying task.

dj_segfault
  • 11,957
  • 4
  • 29
  • 37
  • 2
    A Future returns the final result, but not interim results. – JB Nizet Jan 20 '12 at 21:26
  • How true! There is no such thing as an "intermediate result". If the computation is such that you can define points where what has been computed so far is an useful "intermediate result" one must organize it's functions and methods accordingly to get access to the intermediate data from the caller. – Ingo Jan 20 '12 at 22:03
0

You would have to roll your own API with something like Observer/Observerable if you want to publish intermediate results as a push. A simpler thing would be to just poll for current state through some self defined method.