4

Basically, there is couple different strategies with exception handling when you use ThreadPoolExecutor:

  1. Thread.setUncaughtExceptionHandler() (and Thread.getDefaultUncaughtExceptionHandler())

    Exception is wrapped in the Future, so UncaughtExceptionHandler is never called, so this can't be used.

  2. Setting ThreadFactory The only relevant part is Thread.setUncaughtExceptionHandler() on newley created thread. But this will have no effect, see p.1).

  3. Overriding ThreadPoolExecutor.afterExecute()

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                  Object result = ((Future<?>) r).get();
                } catch (CancellationException ce) {
                  t = ce;
                } catch (ExecutionException ee) {
                  t = ee.getCause();
                } catch (InterruptedException ie) {
                  Thread.currentThread().interrupt(); // ignore/reset
                }
        }
        if (t != null){
            logger.error("ThreadPoolExecutor.afterExecute", t);
        }
    }
    

    This approach almost works. If you exception handling is stateless, that is you don't need to access the state of your original Runnable/Callable task, this is ok. In stateful case you have no access to you original task (even reflection doesn't help, because Runnable above will not hold original task).

How can I handle exception when I do want access state of original task?

Cœur
  • 37,241
  • 25
  • 195
  • 267
alexsmail
  • 5,661
  • 7
  • 37
  • 57

2 Answers2

2

First of all see Handling Exceptions for ThreadPoolExecutor for more background about problem with afterExecute() approach.

ThreadPoolExecutor has

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) ;

and

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value);

when callable, runnable are your original task that you can decorate. This is basic strategies. Below is working code using Spring (I removed comment for clearity):

package org.springframework.scheduling.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.scheduling.concurrent.ExecutorConfigurationSupport;
import org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean;
import org.springframework.util.Assert;

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

    private final Object poolSizeMonitor = new Object();

    private int corePoolSize = 1;

    private int maxPoolSize = Integer.MAX_VALUE;

    private int keepAliveSeconds = 60;

    private int queueCapacity = Integer.MAX_VALUE;

    private boolean allowCoreThreadTimeOut = false;
    //fix
    private CallableTransform callableTransform;

    private ThreadPoolExecutor threadPoolExecutor;


    public void setCorePoolSize(int corePoolSize) {
        synchronized (this.poolSizeMonitor) {
            this.corePoolSize = corePoolSize;
            if (this.threadPoolExecutor != null) {
                this.threadPoolExecutor.setCorePoolSize(corePoolSize);
            }
        }
    }

    public int getCorePoolSize() {
        synchronized (this.poolSizeMonitor) {
            return this.corePoolSize;
        }
    }

    public void setMaxPoolSize(int maxPoolSize) {
        synchronized (this.poolSizeMonitor) {
            this.maxPoolSize = maxPoolSize;
            if (this.threadPoolExecutor != null) {
                this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
            }
        }
    }

    public int getMaxPoolSize() {
        synchronized (this.poolSizeMonitor) {
            return this.maxPoolSize;
        }
    }

    public void setKeepAliveSeconds(int keepAliveSeconds) {
        synchronized (this.poolSizeMonitor) {
            this.keepAliveSeconds = keepAliveSeconds;
            if (this.threadPoolExecutor != null) {
                this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
            }
        }
    }

    public int getKeepAliveSeconds() {
        synchronized (this.poolSizeMonitor) {
            return this.keepAliveSeconds;
        }
    }

    public void setQueueCapacity(int queueCapacity) {
        this.queueCapacity = queueCapacity;
    }

    public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
        this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
    }


    //fix
    public void setCallableDecorator(CallableDecorator callableDecorator) {
        Assert.isNull(this.callableTransform, "You can' call setCallableDecorator() and setTaskDecorator() more than once");
        this.callableTransform = new CallableTransform(){

            @Override
            public Callable<?> decorate(Object originalTask) {
                Callable<?> ret = callableDecorator.decorate((Callable<?>)originalTask);
                return ret;
            }

            @Override
            public boolean isCallable(){
                return true;
            }
    };
}

    //fix
    public void setTaskDecorator(TaskDecorator taskDecorator) {
        Assert.isNull(this.callableTransform, "You can' call setCallableDecorator() and setTaskDecorator() more than once");
        this.callableTransform =  new CallableTransform(){

            @Override
            public Callable<?> decorate(Object originalTask) {
                Callable<?> ret= Executors.callable(taskDecorator.decorate((Runnable)originalTask));
                return ret;
            }

            @Override
            public boolean isCallable(){
                return false;
            }
        };
    }


    @Override
    protected ExecutorService initializeExecutor(
            ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

        BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

        ThreadPoolExecutor executor;

        //fix
        if (this.callableTransform != null) {
            executor = new ThreadPoolExecutor(
                    this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                    queue, threadFactory, rejectedExecutionHandler) {

                @Override
                protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                    if(callableTransform==null){
                        return super.newTaskFor(callable);
                    }

                    Callable<?> wrapedCallable = null;

                    boolean isCallable = callableTransform.isCallable();
                    if(isCallable){
                        wrapedCallable = callableTransform.decorate(callable);
                    } else {
                        //callableTransform accepts Runnable, but we have Callable
                        throw new IllegalStateException("You use TaskDecorator, but submit Callable");
                    }

                    @SuppressWarnings("unchecked")
                    Callable<T> param = (Callable<T>)wrapedCallable;
                    return super.newTaskFor(param);
                }

                @Override
                protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
                    if(callableTransform==null){
                        return super.newTaskFor(runnable, value);
                    }

                    Callable<?> wrapedCallable = null;

                    boolean isRunnable = callableTransform.isRunnable();
                    if(isRunnable){
                        wrapedCallable = callableTransform.decorate(runnable);
                    } else {
                        //callableTransform accepts Callable, but we have Runnable
                        throw new IllegalStateException("You use CallableDecorator, but execute Runnable");
                    }

                    @SuppressWarnings("unchecked")
                    Callable<T> param = (Callable<T>)wrapedCallable;
                    return super.newTaskFor(param);
                }


            };

        } else {
            executor = new ThreadPoolExecutor(
                    this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                    queue, threadFactory, rejectedExecutionHandler);

        }

        if (this.allowCoreThreadTimeOut) {
            executor.allowCoreThreadTimeOut(true);
        }

        this.threadPoolExecutor = executor;
        return executor;
    }


    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
        if (queueCapacity > 0) {
            return new LinkedBlockingQueue<>(queueCapacity);
        }
        else {
            return new SynchronousQueue<>();
        }
    }

    public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
        Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
        return this.threadPoolExecutor;
    }

    public int getPoolSize() {
        if (this.threadPoolExecutor == null) {
            // Not initialized yet: assume core pool size.
            return this.corePoolSize;
        }
        return this.threadPoolExecutor.getPoolSize();
    }

    public int getActiveCount() {
        if (this.threadPoolExecutor == null) {
            // Not initialized yet: assume no active threads.
            return 0;
        }
        return this.threadPoolExecutor.getActiveCount();
    }


    @FunctionalInterface
    public interface CallableDecorator {
        <V> Callable<V> decorate(Callable<V> task);
    }

    @FunctionalInterface
    static interface CallableTransform {
        Callable<?> decorate(Object originalTask);

        default boolean isCallable(){
            return true;
        }

        default boolean isRunnable(){
            return !isCallable();
        }
    }



    //rest of the code execute/submit override
    //...

    @Override
    public boolean prefersShortLivedTasks() {
        return true;
    }

}

And usage example is as foolows:

    ThreadPoolTaskExecutor threadPoolFactory = new ThreadPoolTaskExecutor();
    threadPoolFactory.setCorePoolSize(4);
    threadPoolFactory.setMaxPoolSize(4);
    threadPoolFactory.setKeepAliveSeconds(0);


    CallableDecorator decorator = new CallableDecorator(){

        @Override
        public <T> Callable<T> decorate(Callable<T> task) {
            return () -> {
                try {
                    return task.call();
                }
                catch (Throwable e) {
                    synchronized (executor) {
                        if (!((MyRunnable) task).failSilent){   //note use of state of original Task
                            log.error("Execution Failure!", e);
                        }
                    }
                    throw e;
                }
            };
        }
    };
    threadPoolFactory.setCallableDecorator(decorator);

    threadPoolFactory.initialize();
    executor = threadPoolFactory.getThreadPoolExecutor();

and further:

    executor.submit(new MyCallable(true));
Community
  • 1
  • 1
alexsmail
  • 5,661
  • 7
  • 37
  • 57
1

Basically, there is couple different strategies with exception handling when you use ThreadPoolExecutor:

Although you could override ThreadPoolExecutor.beforeExecute(...), dig out your runnable which is in there via reflection, set a ThreadLocal and then use it in afterExecute(...) this really feels like a hack and is very dependent on the TPE implementation.

I would instead wrap your Runnable or Callable methods in a try/catch log error wrapper. So that you would add things to the thread pool with something like:

threadPool.submit(new RunnableWrapper(myRunnable));
// or
threadPool.submit(new CallableWrapper(myCallable));

These would have the try/catch/log mechanism and also have access to the Runnable for state evaluation. Anything else seems like a hack to me.

You could certainly override the submit(...) methods to do the wrapping of the jobs yourself. This seems a lot cleaner.

Gray
  • 115,027
  • 24
  • 293
  • 354
  • * Using RunnableWrapper is affectively what I achieve with my solution, I decorate myRunnable. The difference is that I pass RunnableWrapper in constructor phase and in any submit phase. You can also override submit method (which is public) and not protected method as alternative. But in such case you should override many such methods and I think the risk of breaking future TPE is higher, but the last point is debatable. * ExecutorCompletionService is not suitable in my case. My exception handling shoud use the state of myRunnable. I can't achieve it with this. – alexsmail Feb 13 '17 at 09:19
  • 1
    I've tweaked my answer @alexsmail after thinking about it some more. – Gray Feb 13 '17 at 16:33
  • Well, consider that you have also such method like invokAll() that you should override. If you go in my direction using override newTaskFor() it will "just works" (it uses newTaskFor() internally). Wrapping yourself is definitely kind of solution, but is not as clean as my approach. Moreover you create as may wrappers as you callable/runnable objects. Moreover I do think that proper exception handling should be encoded with different callback in ThreadPool creation time (as it should be done with ThreadFactory). – alexsmail Feb 14 '17 at 07:13