1

I'm my code I submit some tasks to an ExecutorService and then wait for them to complete using shutdown() and awaitTermination(). But if any one tasks takes longer than a certain period to complete I want it cancelled without affecting other tasks. I use code amended code from ExecutorService that interrupts tasks after a timeout as follows:

package com.jthink.jaikoz.memory;

import com.jthink.jaikoz.MainWindow;

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private boolean isShutdown = false;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();

    //Map Task to the Timeout Task that could be used to interrupt it
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public long getTimeout()
    {
        return timeout;
    }

    public TimeUnit getTimeoutUnit()
    {
        return timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        isShutdown = true;
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            //Schedule a task to interrupt the thread that is running the task after time timeout
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);

            //Add Mapping
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        //Remove mapping and cancel timeout task
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }

        if (isShutdown)
        {
            if(getQueue().isEmpty())
            {
                //Queue is empty so all tasks either finished or currently running
                MainWindow.logger.severe("---Thread Pool Queue is Empty");
                //timeoutExecutor.shutdownNow();
            }
        }
    }

    /**
     * Interrupt the thread
     *
     */
    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            MainWindow.logger.severe("Cancelling task because taking too long");
            thread.interrupt();
        }
    }
}

and a testcase for when tasks have time to complete and when they don't both work

package com.jthink.jaikoz;

import com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor;
import junit.framework.TestCase;

import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by Paul on 08/12/2014.
 */
public class TestThreadPool extends TestCase
{
    public void testThreadPoolTasksComplete() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 6, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }

    public void testThreadPoolTasksCancelled() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }
}

and in my code appear to work:

private boolean matchToRelease(ListMultimap<MatchKey, MetadataChangedWrapper> matchKeyToSongs)
            throws JaikozException
    {
        if (stopTask)
        {
            MainWindow.logger.warning("Analyser stopped detected in matchToRelease");
            return false;
        }

        TimeoutThreadPoolExecutor es = getExecutorService();
        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(matchKeyToSongs.size());
        for(MatchKey matchKey:matchKeyToSongs.keySet())
        {
            List<MetadataChangedWrapper> songs = matchKeyToSongs.get(matchKey);
            futures.add(es.submit(new CorrectFromMusicBrainzWorker(this, stats, matchKey, songs)));
        }
        es.shutdown();
        try
        {
            es.awaitTermination(matchKeyToSongs.keySet().size() * es.getTimeout(), es.getTimeoutUnit());
        }
        catch(InterruptedException ie)
        {
            MainWindow.logger.warning(this.getClass() + " has been interrupted");
            return false;
        }
        return true;
    }

however for one customer even though

---Thread Pool Queue is Empty

is output awaitTermination() doesn't return,only eventually returning when user cancels task two hours later - full log extract here

14/12/2014 20.44.19:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzWorker:getSongsNotMatched:SEVERE: /Volumes/2TB External/New iTunes Library/iTunes Media/Music/XTC:albumMetadataMatchingCounts11:AlreadyMatched:2:ToMatch:11
14/12/2014 20.44.19:com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor:afterExecute:SEVERE: ---Thread Pool Queue is Empty
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.ExecutorServiceEnabledAnalyser:cancelTask:WARNING: Cancelling class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser Task
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser:matchToRelease:WARNING: class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser has been interrupted

So how can it be that awaiterTermination() is not returning even though the logs show queue is empty and therefore shutdown() has been called on both the Executor itself and the embedded timeoutExecutor ?

I have had a few thoughts about this myself but dont know the answer.

  1. Firstly why it is actually neccessary to shutdown the TimeOutExecutor for awaitTermination() to return anyway. In my subclass awaitTermination() is not overridden so if all tasks have completed what does it matter if the TiumeOutExecutor (that awaitTermination() knows nothing about is shutdown or not)

  2. Secondly why does ---Thread Pool Queue is Empty sometimes get output more than once

Community
  • 1
  • 1
Paul Taylor
  • 13,411
  • 42
  • 184
  • 351
  • So what exactly are you trying to do? Do you want to interrupt currently working tasks or not? – Antoniossss Dec 02 '14 at 10:23
  • @Antoniossss yes I want to interrupt currently working tasks, but what I dont want is for a call to shutdown() after tasks have been submitted to the executor to prevent those tasks from running. Also if one task takes too long it should be interrupted but should have no effect on any other submitted tasks. – Paul Taylor Dec 02 '14 at 17:08

1 Answers1

2

I made a custom modification in TimeoutThreadPoolExecutor and it's working fine.

public static class TimeoutThreadPoolExecutor extends ThreadPoolExecutor
{
    private final long timeout;
    private final TimeUnit timeoutUnit;
    private boolean isShutdown = false;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        isShutdown = true;
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
        if (isShutdown) timeoutExecutor.shutdown();
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
            System.out.println("Cancelled");
        }
    }
}

Case 1 : No timeout

final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(
    100, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
    6, TimeUnit.SECONDS);
executorService.submit(new Callable<Object>()
{
    @Override
    public Object call() throws Exception
    {
        Thread.sleep(5000);
        System.out.println("Done");
        return null;
    }

});

executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");

It prints :

Task done
Program done

Case 2 : Timeout

final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(
    100, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
    3, TimeUnit.SECONDS);
executorService.submit(new Callable<Object>()
{
    @Override
    public Object call() throws Exception
    {
        Thread.sleep(5000);
        System.out.println("Task done");
        return null;
    }

});

executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");

It prints :

Cancelled
Program done
ToYonos
  • 16,469
  • 2
  • 54
  • 70
  • @ToYonus thanks but your version of TimeoutThreadPoolExecutor is not the same please take a look at the linked question. Nor is your use of it - the whole point of the implementation is you only have to submit one task to the executor for each logical task required, not two tasks for each task you want to do. – Paul Taylor Dec 03 '14 at 12:29
  • I used the exact same implementation as in the linked answer, just add `System.out.println("Cancelled");`. I have understood the 2 task mechanisme. It's working for me. Give me a full test that does not work and I will take a look. – ToYonos Dec 03 '14 at 12:32
  • @ToYonus no I'm sorry you have not, look at the code in the linked question and IGNORE the answer to those questions, the solution is in the question. My code in my question shows how i use that Executor implementation. – Paul Taylor Dec 03 '14 at 15:51
  • Yes yes. My code is coming from the **question**. I have just replaced the main part of the code by `/* Implementation */` for the sake of the readability – ToYonos Dec 03 '14 at 15:57
  • @ToYonus ah okay sorry your timer task confused me), but maybe the problem is the comment // I delayed the shutdown by one second to avoid RejectedExecutionException, why would you need to do this and does your example work if you remove your timer task and simply run shutdown() and awaitTermination() in the same thread as the thread used to submit task to the executor - which is what I am doing – Paul Taylor Dec 03 '14 at 16:42
  • if i remove the timer, i would get `Exception in thread "pool-1-thread-1" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@5162c257 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@5fa5360b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]` – ToYonos Dec 03 '14 at 16:47
  • @ToYonus right that is the problem with a standard Executor you do not get that, and if you make my modification then that problem doesn't work but awaitTermination() doesnt work – Paul Taylor Dec 03 '14 at 21:10
  • So you have the same issue as me ? – ToYonos Dec 03 '14 at 21:54
  • Well yes, that is why I posted this question, try modifying the shutdown method like i did and see what happens – Paul Taylor Dec 03 '14 at 23:56
  • I edited my response. I made a modification in `TimeoutThreadPoolExecutor` (added a boolean), it's working fine, without the Timer. Try it. – ToYonos Dec 04 '14 at 10:22
  • I tried it but it is never returning from awaitTermination() for me, are you sure awaitTermination() is returning for you - your output doesn't tell me either way – Paul Taylor Dec 04 '14 at 20:25
  • It returns for me. I edited my response, added some output. The java program exits normally. If it's not the case for you, a task must be still running. – ToYonos Dec 05 '14 at 09:58
  • actually I tried against more tasks again I got the RejectionExecutionException - looking at your fix it calls scheduledFuture in afterExecute as soon as isShutdown is set, but this could be set before all tasks have been started, surely it should only be called once every task that was submitted to the executor has been started ? – Paul Taylor Dec 05 '14 at 12:22
  • It difficult to say precisely how to do it. It depends on your implementation actually. With mine, it's working. Adapt my code for you own usage. Or give me a full example of yours that does not work. – ToYonos Dec 05 '14 at 13:21
  • I amended your answer to only shutdown the scheduledexecutor if the queue was empty and it is now working for me, so thankyou your work o this helped me to find a solution so Im awarding you the bounty. – Paul Taylor Dec 08 '14 at 10:01