2

I'm my code I submit some tasks to an ExecutorService and then wait for them to complete using shutdown() and awaitTermination(). But if any one tasks takes longer than a certain period to complete I want it cancelled without affecting other tasks. I use code amended code from ExecutorService that interrupts tasks after a timeout as follows:

package com.jthink.jaikoz.memory;

import com.jthink.jaikoz.MainWindow;

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private boolean isShutdown = false;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();

    //Map Task to the Timeout Task that could be used to interrupt it
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public long getTimeout()
    {
        return timeout;
    }

    public TimeUnit getTimeoutUnit()
    {
        return timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        isShutdown = true;
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            //Schedule a task to interrupt the thread that is running the task after time timeout
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);

            //Add Mapping
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        //Remove mapping and cancel timeout task
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }

        if (isShutdown)
        {
            if(getQueue().isEmpty())
            {
                //Queue is empty so all tasks either finished or currently running
                MainWindow.logger.severe("---Thread Pool Queue is Empty");
                timeoutExecutor.shutdown();
            }
        }
    }

    /**
     * Interrupt the thread
     *
     */
    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            MainWindow.logger.severe("Cancelling task because taking too long");
            thread.interrupt();
        }
    }
}

and a testcase for when tasks have time to complete and when they don't both work works as expected

package com.jthink.jaikoz;

import com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor;
import junit.framework.TestCase;

import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by Paul on 08/12/2014.
 */
public class TestThreadPool extends TestCase
{
    public void testThreadPoolTasksComplete() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 6, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }

    public void testThreadPoolTasksCancelled() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }
}

and in my code appear to work:

private boolean matchToRelease(ListMultimap<MatchKey, MetadataChangedWrapper> matchKeyToSongs)
            throws JaikozException
    {
        if (stopTask)
        {
            MainWindow.logger.warning("Analyser stopped detected in matchToRelease");
            return false;
        }

        TimeoutThreadPoolExecutor es = getExecutorService();
        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(matchKeyToSongs.size());
        for(MatchKey matchKey:matchKeyToSongs.keySet())
        {
            List<MetadataChangedWrapper> songs = matchKeyToSongs.get(matchKey);
            futures.add(es.submit(new CorrectFromMusicBrainzWorker(this, stats, matchKey, songs)));
        }
        es.shutdown();
        try
        {
            es.awaitTermination(matchKeyToSongs.keySet().size() * es.getTimeout(), es.getTimeoutUnit());
        }
        catch(InterruptedException ie)
        {
            MainWindow.logger.warning(this.getClass() + " has been interrupted");
            return false;
        }
        return true;
    }

however for one customer even though

---Thread Pool Queue is Empty

is output awaitTermination() doesn't return,only eventually returning when user cancels task two hours later - full log extract here

14/12/2014 20.44.19:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzWorker:getSongsNotMatched:SEVERE: /Volumes/2TB External/New iTunes Library/iTunes Media/Music/XTC:albumMetadataMatchingCounts11:AlreadyMatched:2:ToMatch:11
14/12/2014 20.44.19:com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor:afterExecute:SEVERE: ---Thread Pool Queue is Empty
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.ExecutorServiceEnabledAnalyser:cancelTask:WARNING: Cancelling class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser Task
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser:matchToRelease:WARNING: class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser has been interrupted

So how can it be that awaiTermination() is not returning even though the logs show queue is empty and therefore shutdown() has been called on both the Executor itself and the embedded timeoutExecutor ?

I have had a few thoughts about this myself but dont know the answer.

  1. Firstly why it is actually necessary to shutdown the TimeOutExecutor for awaitTermination() to return anyway. In my subclass awaitTermination() is not overridden so if all tasks have completed what does it matter if the TiumeOutExecutor (that awaitTermination() knows nothing about is shutdown or not)

  2. Secondly why does ---Thread Pool Queue is Empty sometimes get output more than once

  3. TimeOutExecutor is single threaded, is this correct/neccessary ?

Update based on Holgers answer

So the problem you have is that you are shutting down the timeoutExecutor way too early, hence it might miss one or more of it’s tasks to interrupt pending tasks of your thread pool executor.

Right I see now that an empty queue just means that all tasks have been completed OR started. (sorry my example test was misleading previously it was running more then 10 tasks that was a temporary edit, and in production code no of workers is based on number of cpus on users machine).

So you are saying that I shutdown() the timeoutExecutor too early (there could be upto WorkerSize -1 tasks still running)and that means all the timeoutExecutors that are still running for the tasks that haven't completed yet are interrupted. Therefore, if any of those remaining do not complete of their own accord for some reason the timeout tasks for them no longer exist and therefore cannot be used to interrupt them. But the only reason awaitTermination() woiuldnt return would be if one of these last (WorkerSize -1) tasks didn't complete.

Of my own accord I had changed beforeExecute() to

protected void afterExecute(Runnable r, Throwable t) {
    ScheduledFuture timeoutTask = runningTasks.remove(r);
    if(timeoutTask != null) {
        timeoutTask.cancel(false);
    }
    if (isShutdown)
    {
        if(getQueue().isEmpty())
        {

            if(runningTasks.size()==0)
            {
                this.shutdownNow();
            }
        }
    }
}

to ensure it would finish I used shutdownNow() but not until everything finished, but based on your comment this could still possibly not work

And I should do

protected void afterExecute(Runnable r, Throwable t) {
    ScheduledFuture timeoutTask = runningTasks.remove(r);
    if(timeoutTask != null) {
        timeoutTask.cancel(false);
    }
}

and

protected void terminated() 
{
    timeoutExecutor.shutdown();
}

and terminated() is called as soon all the tasks submitted have finished (either naturally or via being cancelled by corresponding timeoutExecutor) it doesn't matter that timeoutExecutor still exists at this point ?

For completnesss modifying my test case so that the task would take a long time unless the timeout task was working shows the original solution failing (hanging) and the revised solution working

public void testThreadPoolTasksCancelled() throws Exception
    {
        Instant t1, t2;
        t1 = Instant.now();
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);

        for (int i = 0; i < 50; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(500000000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        t2 = Instant.now();
        System.out.println("Program done:"+(Duration.between(t1, t2).toMillis()/ 1000+ " seconds"));
    }
Community
  • 1
  • 1
Paul Taylor
  • 13,411
  • 42
  • 184
  • 351
  • That’s right; `ThreadPoolExecutor.terminated()` is called regardless of whether `timeoutExecutor` exists as `ThreadPoolExecutor` doesn’t know anything about it. But since `afterExecute` has been called for all jobs already at this point, it is the perfect time to shut down `timeoutExecutor`. – Holger Dec 16 '14 at 14:16
  • @Holger okay great, just one thing I dont get i tried commenting out timeoutExecutor.shutdown(); and my test still completed ! – Paul Taylor Dec 16 '14 at 14:20
  • That depends on the caller. E.g. if you or the testing framework calls `System.exit(…)`, directly or indirectly, it will terminate all threads anyway. – Holger Dec 16 '14 at 14:40
  • @Holger I thought awaitTermination() would just hang before we get to the end of the method – Paul Taylor Dec 16 '14 at 14:43

1 Answers1

2

The queue contains only jobs that have not started yet. Having an empty queue does not imply that there are no pending jobs; they might just have been removed in order to be executed. Especially in your example code, the assumption that an empty queue implies no running jobs is deadly wrong; since you configured the executor to have ten core threads and submit ten jobs, the queue will always be empty throughout the entire execution of your example code.

So the problem you have is that you are shutting down the timeoutExecutor way too early, hence it might miss one or more of it’s tasks to interrupt pending tasks of your thread pool executor.

Note that in principle, the jobs might be even in the state that they are removed from the queue (if ever added) but beforeExecute has not been called yet. So even having an empty queue and an empty runningTasks map does not guaranty that there are no pending jobs.


To answer your other question, you have to shut down the timeoutExecutor as it has an associated alive thread which will always keep the executor alive. So not shutting it down will create a memory leak and further keep a thread alive, hence always prevent the automatic JVM shutdown.

But the right place to do the shutdown of timeoutExecutor is an override of the method protected void terminated() which is exactly intended for the cleanup.


To the last bullet, it doesn’t matter how many threads your timeoutExecutor but given how simple the tasks are, there is no benefit in having multiple threads and a single-threaded executor is the simplest and probably most efficient solution.

Joffrey
  • 32,348
  • 6
  • 68
  • 100
Holger
  • 285,553
  • 42
  • 434
  • 765
  • thanks for your great answer Ive updated my question with an answer based on what you say, appreciate if you conform I have understood you right. – Paul Taylor Dec 16 '14 at 14:09
  • Ive just realized the the Runnable passed to beforeExecute is not actually the submitted task but a FutureTask wrapping the task so if I passed this to the TimeoutTask instead of the Thread I could call cancel(true) on the FutureTask instead of interrupting the thread. Just wondering is this better/different to what I'm currently doing in either how it works/how the code reads ? – Paul Taylor Dec 17 '14 at 13:38
  • I would consider calling `cancel` to be cleaner than interrupting the thread, however, you have to consider that only tasks enqueued via `submit` are wrapped into a `FutureTask`; if you queue a `Runnable` via `execute`, that `Runnable` will be the very instance you get in `beforeExecute`. If that’s not an issue for you, you can use `cancel`. Note that the wrapping happens in [`newTaskFor`](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/AbstractExecutorService.html#newTaskFor(java.util.concurrent.Callable)) and can be customized by overriding that method. – Holger Dec 17 '14 at 13:46