2

I am investigating the CompletionService class, and I find really useful the decoupling of the submitting queue from the completition queue.

But I miss a way to poll/take cancelled tasks as well (which could be considered completed in a way). Can it be done easily someway?

Future<String> task1Future =  completionService.submit(myCallableTask1);
Future<String> task2Future =  completionService.submit(myCallableTask2);
task2Future.cancel();
Future<String> lastComplTaskFuture = completionService.take(); 
                        //Seems to return only the completed or 
                        //interrupted tasks, not those cancelled (task2)

EDIT: After checking some of the answers, I realized what is happening. CompletitionService returns in the same order as the submitted jobs. If you run job a, b and c; cancel b and c while a is working; and finally poll the completitionService, the cancellation of b and c won't be notified until a taks is terminated. Also, I realized that if you shutdown the executor instead of cancelling individiual tasks, those tasks still in the queue don't reach completitionservice, even if their cancellation is finished while the active tasks have not ended yet.

EDIT2: Ok, I added a whole testcase

import java.util.concurrent.*;

public class CompletionTest {

    public static void main(String[] args){
        CompletionService<String> completion = 
                new ExecutorCompletionService<String>(Executors.newSingleThreadExecutor()); 
        Future<String> aFuture =
                 completion.submit(() -> {Thread.sleep(10000); return "A";});
        completion.submit(() -> {return "B";}).cancel(true);
        completion.submit(() -> {return "C";}).cancel(true);
        completion.submit(() -> {return "D";}).cancel(true);

        long arbitraryTime = System.currentTimeMillis();
        while (true){
            String result = getNextResult(completion);
            System.out.println(result);

            if (System.currentTimeMillis() > arbitraryTime+5000){
                aFuture.cancel(true);
            }
         }
    }

    public static String getNextResult(CompletionService<String> service){
        try {
            String taskId = service.take().get();
            return "'"+taskId+"' job completed successfully.";
        } 
        catch (InterruptedException e ) { return "Unknown job was interrupted/stopped while waiting (no thread.wait() or infinite loop in the job so I guess this should not be possible)."; } 
        catch (CancellationException e) { return "Unknown job was cancelled."; }  
        catch (ExecutionException e) { 
            return "Unknown job returned with the following internal exception while running: " + e.getCause().getMessage();
        }
    }
}

I expected an output like:

Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.
'A' job completed successfully.

But instead it was:

'A' job completed successfully.
Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.

I even tried to use a PriorityBlockingQueue as a queue for the CompletionService, using Comparator.<Future<String>,Boolean>comparing(Future::isCancelled).reversed() but it did not worked neither (I guess it does not resort if an element changes state although)

Whimusical
  • 6,401
  • 11
  • 62
  • 105
  • Your code works fine for me and returns a cancelled `Future`. I've probably made assumptions (eg. `cancel` expects a `boolean` argument). Please post an MCVE. – Sotirios Delimanolis Dec 02 '15 at 04:26
  • @Sotirios Delimanolis But does it return your "interrupted" cancelled futures (i.e, interrupted while underlying task active) or the "discarded" cancelled futures (i.e, cancelled when queued and not run yet)? If it's the later, then I must have some kind of problem in my queue or thread pool setting – Whimusical Dec 02 '15 at 17:05
  • 1
    It does both. Both `Future`'s `isCancelled` return `true`. – Sotirios Delimanolis Dec 03 '15 at 01:14
  • @Sotirios Delimanolis Check my Edit please, I think there is a bit of a truth in each of the premises we stated – Whimusical Dec 07 '15 at 19:18
  • I can't reproduce _If you run job a, b and c; cancel b and c while a is working; and finally poll the completitionService, the cancellation of b and c won't be notified until a taks is terminated._ at all. Please provide an MCVE where this is happening and another for your _Also, [...]_. – Sotirios Delimanolis Dec 07 '15 at 19:32
  • @Sotirios Delimanolis Thanks fr your patience, now there is a whole testcase – Whimusical Dec 08 '15 at 01:18

1 Answers1

1

You're submitting tasks to a thread pool with a single thread.

The first task you submit will consume that thread (sleeping for a short time).

The others will be queued within the ExecutorService. Cancelling the corresponding futures does not remove the corresponding task from that queue. It just changes the state of the Future.

The tasks will remain in the queue until a thread is free to process them. The thread will notice the corresponding Future is cancelled and cancel the entire task as well. That's when the CompletionService knows that they are "done", no sooner.

Consider increasing the size of your thread pool.

Sotirios Delimanolis
  • 274,122
  • 60
  • 696
  • 724