I am investigating the CompletionService class, and I find really useful the decoupling of the submitting queue from the completition queue.
But I miss a way to poll/take cancelled tasks as well (which could be considered completed in a way). Can it be done easily someway?
Future<String> task1Future = completionService.submit(myCallableTask1);
Future<String> task2Future = completionService.submit(myCallableTask2);
task2Future.cancel();
Future<String> lastComplTaskFuture = completionService.take();
//Seems to return only the completed or
//interrupted tasks, not those cancelled (task2)
EDIT: After checking some of the answers, I realized what is happening. CompletitionService returns in the same order as the submitted jobs. If you run job a, b and c; cancel b and c while a is working; and finally poll the completitionService, the cancellation of b and c won't be notified until a taks is terminated. Also, I realized that if you shutdown the executor instead of cancelling individiual tasks, those tasks still in the queue don't reach completitionservice, even if their cancellation is finished while the active tasks have not ended yet.
EDIT2: Ok, I added a whole testcase
import java.util.concurrent.*;
public class CompletionTest {
public static void main(String[] args){
CompletionService<String> completion =
new ExecutorCompletionService<String>(Executors.newSingleThreadExecutor());
Future<String> aFuture =
completion.submit(() -> {Thread.sleep(10000); return "A";});
completion.submit(() -> {return "B";}).cancel(true);
completion.submit(() -> {return "C";}).cancel(true);
completion.submit(() -> {return "D";}).cancel(true);
long arbitraryTime = System.currentTimeMillis();
while (true){
String result = getNextResult(completion);
System.out.println(result);
if (System.currentTimeMillis() > arbitraryTime+5000){
aFuture.cancel(true);
}
}
}
public static String getNextResult(CompletionService<String> service){
try {
String taskId = service.take().get();
return "'"+taskId+"' job completed successfully.";
}
catch (InterruptedException e ) { return "Unknown job was interrupted/stopped while waiting (no thread.wait() or infinite loop in the job so I guess this should not be possible)."; }
catch (CancellationException e) { return "Unknown job was cancelled."; }
catch (ExecutionException e) {
return "Unknown job returned with the following internal exception while running: " + e.getCause().getMessage();
}
}
}
I expected an output like:
Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.
'A' job completed successfully.
But instead it was:
'A' job completed successfully.
Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.
I even tried to use a PriorityBlockingQueue as a queue for the CompletionService, using Comparator.<Future<String>,Boolean>comparing(Future::isCancelled).reversed()
but it did not worked neither (I guess it does not resort if an element changes state although)