14

I push my Futures from a ExecutorService into a hash map. Later, I may call cancel on Futures from within the hash map. Although the result is true, I later hit breakpoints within the Callable procedure, as if the Future cancel() had no effect. I think it might be a case of two different references here (even though the reference IDs are listed as the same when breakpointing), but was wondering if some experts could chime in. Here's what the code looks like:

ExecutorService taskExecutor = Executors.newCachedThreadPool();
Map <String, Future<Object>> results = new HashMap <String, Future<Object>>();      

Future<Object> future = taskExecutor.submit(new MyProcessor(uid));
results.put(uid, future);

I allow processing to continue (it's a loop that submits tasks as they are passed in), and later I may attempt to cancel from an outside source by calling this method:

public static synchronized boolean cancelThread(String uid) {
    Future<Object> future = results.get(uid);
    boolean success = false;
    if (future != null) {
        success = (future.isDone() ? true : future.cancel(true));
        if (success)
            results.remove(uid);
    }
    return success;     
}

But I still encounter a "non-cancelled" path within MyProcessor.call() after future.cancel() is called - i.e. it's not really being cancelled.

Where am I going wrong with this? Is there a better was to do this?

Gray
  • 115,027
  • 24
  • 293
  • 354
Ryan H
  • 349
  • 1
  • 6
  • 20

2 Answers2

24

I later hit breakpoints within the Callable procedure, as if the Future cancel() had no effect.

Future.cancel(true) removes a job that is in the queue and not yet running but if the job is already running it does the equivalent of Thread.interrupt() on the thread running the job. This sets the interrupt bit on the thread and causes any sleep(), wait(), and some other methods to throw InterruptedException.

It is important to realize that it does not stop the thread. You need to actively check for the interrupt flag in your thread loop or properly handle InterruptedException.

See my SO answer here for more details: How to suspend thread using thread's id?

Gray
  • 115,027
  • 24
  • 293
  • 354
  • Also related to: http://stackoverflow.com/questions/6698977/thread-interrupt-will-it-cancel-oncoming-wait-call – Gray Jun 22 '12 at 15:03
  • I see, that makes sense - I'm not in a wait() status at this point in th Callable, so it isn't throwing InterruptedException for me. Unfortunately, what I'm trying to cancel is a single statement call to a database once it has already started, so at best I can put a test for if the thread is interrupted afterwards. – Ryan H Jun 22 '12 at 16:03
  • To clarify, this is the statement I'm likely in when the cancel is requested, a PreparedStatement into the database: 'stmt.execute();' So I guess I have to let the database finish course, and check for interrupt afterwards. – Ryan H Jun 22 '12 at 16:15
  • Ok, so I changed it to a Runnable Thread instead of Callable, so I can override the interrupt() method and within it cancel my database statement using stmt.cancel(); However, it appears Future.cancel(true) may not explicitly be calling this interrupt method, as I never get within it. Thought I had something there. – Ryan H Jun 22 '12 at 18:46
  • The issue is @Ryan is that you are submitting a `Runnable` to the thread pool. It has it's own thread so it's only using your `Thread.run()` method. If you need to override `interrupt()` then you'll need to fork your own threads and not use a pool. – Gray Jun 22 '12 at 22:27
1

FutureTask :: boolean cancel(boolean mayInterruptIfRunning) will perform interrupt on current running thread.

FutureTask.java
public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();     ////////////HERE/////////////
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

JavaDoc says the below for interrupt

public void interrupt()

Interrupts this thread. Unless the current thread is interrupting itself, which is always permitted, the checkAccess method of this thread is invoked, which may cause a SecurityException to be thrown.

If this thread is blocked in an invocation of the wait(), wait(long), or wait(long, int) methods of the Object class, or of the join(), join(long), join(long, int), sleep(long), or sleep(long, int), methods of this class, then its interrupt status will be cleared and it will receive an InterruptedException.

If this thread is blocked in an I/O operation upon an interruptible channel then the channel will be closed, the thread's interrupt status will be set, and the thread will receive a ClosedByInterruptException.

If this thread is blocked in a Selector then the thread's interrupt status will be set and it will return immediately from the selection operation, possibly with a non-zero value, just as if the selector's wakeup method were invoked.

If none of the previous conditions hold then this thread's interrupt status will be set.

Interrupting a thread that is not alive need not have any effect.

Throws: SecurityException - if the current thread cannot modify this thread

To conclude; cancel of FutureTask has only impact if the thread is blocked (in an invocation of the wait(),...) else it is developer responsibility to check Thread.currentThread().isInterrupted() to quit; while performing non blocked operation.

Kanagavelu Sugumar
  • 18,766
  • 20
  • 94
  • 101