14

I want to cancel a FutureTask that I get from a ThreadPoolExecutor but I want to be sure that Callable on the thread pool has stopped its work.

If I call FutureTask#cancel(false) and then get() (to block until completion) I get a CancelledException. Is this exception thrown immediately or after the task has stopped executing?

Amit Kumar Lal
  • 5,537
  • 3
  • 19
  • 37
Jon Tirsen
  • 4,750
  • 4
  • 29
  • 27
  • what's the use-case? it doesn't seem like cancelling is giving you much benefit here – if you don't want the job to be interrupted yet you still want to wait for it to complete, what is cancelling giving you? – Jed Wesley-Smith May 18 '11 at 23:16
  • 1
    The task is working on shared state, I want to ensure it has stopped its work before I start a new one which works on the same shared state. – Jon Tirsen May 20 '11 at 04:23
  • but, what is the value then of the cancel? is the Future being shared amongst many clients and you want to communicate to them? – Jed Wesley-Smith May 20 '11 at 22:30

5 Answers5

2

Yes, CancellationException is thrown immediately. You may extend FutureTask to add get() method's version which waits until Callable's thread is finished.

public class ThreadWaitingFutureTask<T> extends FutureTask<T> {

    private final Semaphore semaphore;

    public ThreadWaitingFutureTask(Callable<T> callable) {
        this(callable, new Semaphore(1));
    }

    public T getWithJoin() throws InterruptedException, ExecutionException {
        try {
            return super.get();
        }
        catch (CancellationException e) {
            semaphore.acquire();
            semaphore.release();
            throw e;
        }
    }

    private ThreadWaitingFutureTask(final Callable<T> callable, 
                final Semaphore semaphore) {
        super(new Callable<T>() {
            public T call() throws Exception {
                semaphore.acquire();
                try {
                    return callable.call();
                }
                finally {
                    semaphore.release();
                }
            }
        });
        this.semaphore = semaphore;
    }
}
Aleksey Otrubennikov
  • 1,121
  • 1
  • 12
  • 26
  • This answer has the same race condition as @FooJBar's. There is a window between when `FutureTask.run` checks `state` and runs the callable during which both `cancel` and `getWithJoin` can successfully complete. However, the callable will still run. The callable should itself check if the task hasn't been cancelled, but that is tricky to do. – Aleksandr Dubinsky May 06 '15 at 15:17
2

Aleksey's example works well. I wrote a variant with a constructor taking a Runnable (will return null) and showing how to directly block (join) on cancel():

public class FutureTaskCancelWaits<T> extends FutureTask<T> {

    private final Semaphore semaphore;

    public FutureTaskCancelWaits(Runnable runnable) {
        this(Executors.callable(runnable, (T) null));
    }

    public FutureTaskCancelWaits(Callable<T> callable) {
        this(callable, new Semaphore(1));
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        // If the task was successfully cancelled, block here until call() returns
        if (super.cancel(mayInterruptIfRunning)) {
            try {
                semaphore.acquire();
                // All is well
                return true;
            } catch (InterruptedException e) {
                // Interrupted while waiting...
            } finally {
                semaphore.release();
            }
        }
        return false;
    }

    private FutureTaskCancelWaits(final Callable<T> callable, final Semaphore semaphore) {
        super(new Callable<T>() {
            public T call() throws Exception {
                semaphore.acquire();
                try {
                    return callable.call();
                } finally {
                    semaphore.release();
                }
            }
        });
        this.semaphore = semaphore;
    }
}
FooJBar
  • 21
  • 1
  • 1
    Unfortunately, there is a fatal flaw in this answer. As you can see in FutureTask.run:262 (Oracle JDK 8u40), after state is checked and before `c.call()` occurs, a call to `cancel` may occur. Since the semaphore has not been entered yet, `cancel` will finish right away. However, `c.call()` will run regardless. A solution might be to add a check for `isCancelled()` in the new callable. However, the compiler complains with "cannot reference this before supertype constructor is called." – Aleksandr Dubinsky May 06 '15 at 15:08
  • OTOH, it does appear that `semaphore.acquire` will throw `InterruptedException` if the thread has been interrupted by `cancel`, but only if `mayInterruptIfRunning` was set to true. – Aleksandr Dubinsky May 06 '15 at 16:01
2

This answer fixes the race condition in Aleksey's and FooJBar's code by checking if the task has been cancelled inside the callable. (There is a window between when FutureTask.run checks state and runs the callable during which both cancel and getWithJoin can successfully complete. However, the callable will still run.)

I've also decided not to override the original cancel, since the new cancel needs to declare InterruptedException. The new cancel gets rid of its useless return value (since true can mean any one of "task has not started", "task has started and has already done most of its damage", "task has started and will eventually complete"). Gone also is the check of super.cancel's return value, so that if the new cancel is called multiple times from different threads, they will all wait for the task to complete.

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * Based on: http://stackoverflow.com/questions/6040962/wait-for-cancel-on-futuretask
 * 
 * @author Aleksandr Dubinsky
 */
public class FixedFutureTask<T> extends FutureTask<T> {

     /**
      * Creates a {@code FutureTask} that will, upon running, execute the given {@code Runnable}, 
      * and arrange that {@code get} will return the given result on successful completion.
      *
      * @param runnable the runnable task
      * @param result the result to return on successful completion. 
      *               If you don't need a particular result, consider using constructions of the form:
      *               {@code Future<?> f = new FutureTask<Void>(runnable, null)}
      * @throws NullPointerException if the runnable is null
      */
      public 
    FixedFutureTask (Runnable runnable, T result) {
            this (Executors.callable (runnable, result));
        }

     /**
      * Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}.
      *
      * @param  callable the callable task
      * @throws NullPointerException if the callable is null
      */
      public 
    FixedFutureTask (Callable<T> callable) {
            this (new MyCallable (callable));
        }

      /** Some ugly code to work around the compiler's limitations on constructors */
      private 
    FixedFutureTask (MyCallable<T> myCallable) {
            super (myCallable);
            myCallable.task = this;
        }

    private final Semaphore semaphore = new Semaphore(1);

    private static class MyCallable<T> implements Callable<T>
    {
        MyCallable (Callable<T> callable) {
                this.callable = callable;
            }

        final Callable<T> callable;
        FixedFutureTask<T> task;

          @Override public T
        call() throws Exception {

                task.semaphore.acquire();
                try 
                {
                    if (task.isCancelled())
                        return null;

                    return callable.call();
                }
                finally 
                {
                    task.semaphore.release();
                }
            }
    }

     /**
      * Waits if necessary for the computation to complete or finish cancelling, and then retrieves its result, if available.
      *
      * @return the computed result
      * @throws CancellationException if the computation was cancelled
      * @throws ExecutionException if the computation threw an exception
      * @throws InterruptedException if the current thread was interrupted while waiting
      */
      @Override public T 
    get() throws InterruptedException, ExecutionException, CancellationException {

            try 
            {
                return super.get();
            }
            catch (CancellationException e) 
            {
                semaphore.acquire();
                semaphore.release();
                throw e;
            }
        }

     /**
      * Waits if necessary for at most the given time for the computation to complete or finish cancelling, and then retrieves its result, if available.
      *
      * @param timeout the maximum time to wait
      * @param unit the time unit of the timeout argument
      * @return the computed result
      * @throws CancellationException if the computation was cancelled
      * @throws ExecutionException if the computation threw an exception
      * @throws InterruptedException if the current thread was interrupted while waiting
      * @throws CancellationException
      * @throws TimeoutException if the wait timed out
      */
      @Override public T
    get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException {

            try 
            {
                return super.get (timeout, unit);
            }
            catch (CancellationException e) 
            {
                semaphore.acquire();
                semaphore.release();
                throw e;
            }
        }

     /**
      * Attempts to cancel execution of this task and waits for the task to complete if it has been started.
      * If the task has not started when {@code cancelWithJoin} is called, this task should never run.
      * If the task has already started, then the {@code mayInterruptIfRunning} parameter determines
      * whether the thread executing this task should be interrupted in an attempt to stop the task.
      *
      * <p>After this method returns, subsequent calls to {@link #isDone} will
      * always return {@code true}.  Subsequent calls to {@link #isCancelled}
      * will always return {@code true} if this method returned {@code true}.
      *
      * @param mayInterruptIfRunning {@code true} if the thread executing this task should be interrupted; 
      *                              otherwise, in-progress tasks are allowed to complete
      * @throws InterruptedException if the thread is interrupted
      */
      public void
    cancelAndWait (boolean mayInterruptIfRunning) throws InterruptedException {

            super.cancel (mayInterruptIfRunning);

            semaphore.acquire();
            semaphore.release();
        }
}
Aleksandr Dubinsky
  • 22,436
  • 15
  • 82
  • 99
  • Of course, `get (long timeout, TimeUnit unit)` no longer obeys its contract because `semaphore.acquire()` has no timeout. A different thread synchronization primitive should be used. However, I won't change it for now. – Aleksandr Dubinsky May 09 '15 at 01:07
1

It is thrown as soon as it is cancelled.

There is no easy way to know it has started and is finished. You can create a wrapper for you runnable to check its state.

final AtomicInteger state = new AtomicInteger();
// in the runnable
state.incrementAndGet();
try {
    // do work
} finally {
   state.decrementAdnGet();
}
Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • "There is no easy way to know it has started and is finished" -- you will get false from cancel() method in such case. I think that combination of result from cancel() method and result from get() method should give you full information, but I may be wrong. – Peter Štibraný May 18 '11 at 07:08
  • If you cancel a running task, `cancel(true);` should return `true` and `get()` will throw a `CancellationException` However, `cancel(true)` just sets the interrupted flag on the thread of a running task and it can still be running. – Peter Lawrey May 18 '11 at 15:03
  • A simple flag doesn't answer the question, "has it started and finished, or has it not started yet?" Neither does it allow you to wait until the task has fully finished (or has been cancelled with no chance of being started). – Aleksandr Dubinsky May 06 '15 at 14:06
  • @AleksandrDubinsky true, but that is a different question. You have a point that if you cancel there is a risk the task has started and ignores interrupts. What you could do is have a flag where the task aborts itself and always wait for it to complete ie don't use cancel () – Peter Lawrey May 06 '15 at 17:17
0

CompletionSerivce is more powerful than only FutureTask and in many case it's more suitable. I get some ideas from it to solve the problem. Besides, its subclass ExecutorCompletionService is simple than FutureTask, just including a few lines code. It's easy to read. So I modify the class to get partly computed result. A satisfying solution for me, after all, it looks simple and clear.

CompletionService can ensure that the FutureTask have already been done we get from take or poll method. Why? Because the QueueingFuture class, its method run are called only, other methods such as cancel was not called. In other words, It completes normally.

Demo code:

CompletionService<List<DeviceInfo>> completionService =
                new MyCompletionService<>(Executors.newCachedThreadPool());   
        Future task = completionService.submit(yourTask);
    try {
        LogHelper.i(TAG, "result 111: " );
        Future<List<DeviceInfo>> result = completionService.take();
        LogHelper.i(TAG, "result: " + result.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

This is the class code:

import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;

/**
*  This is a CompletionService like java.util.ExecutorCompletionService, but we can get partly computed result
 *  from our FutureTask which returned from submit, even we cancel or interrupt it.
 *  Besides, CompletionService can ensure that the FutureTask is done when we get from take or poll method.
 */
public class MyCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * FutureTask extension to enqueue upon completion.
     */
    private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        protected void done() { completionQueue.add(task); }
    }

    private static class DoneFutureTask<V> extends FutureTask<V> {
        private Object outcome;

        DoneFutureTask(Callable<V> task) {
            super(task);
        }

        DoneFutureTask(Runnable task, V result) {
            super(task, result);
        }

        @Override
        protected void set(V v) {
            super.set(v);
            outcome = v;
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            try {
                return super.get();
            } catch (CancellationException e) {
                return (V)outcome;
            }
        }

    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
            return new DoneFutureTask<V>(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
            return new DoneFutureTask<V>(task, result);
    }

    /**
     * Creates an MyCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is {@code null}
     */
    public MyCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * Creates an MyCompletionService using the supplied
     * executor for base task execution and the supplied queue as its
     * completion queue.
     *
     * @param executor the executor to use
     * @param completionQueue the queue to use as the completion queue
     *        normally one dedicated for use by this service. This
     *        queue is treated as unbounded -- failed attempted
     *        {@code Queue.add} operations for completed tasks cause
     *        them not to be retrievable.
     * @throws NullPointerException if executor or completionQueue are {@code null}
     */
    public MyCompletionService(Executor executor,
                               BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}
Victor Choy
  • 4,006
  • 28
  • 35