6

I'm looking to have a ThreadPoolExecutor where I can set a corePoolSize and a maximumPoolSize and what happens is the queue would hand off task immediately to the thread pool and thus create new threads until it reaches the maximumPoolSize then start adding to a queue.

Is there such a thing? If not, are there any good reason it doesn't have such a strategy?

What I want essentially is for tasks to be submitted for execution and when it reaches a point where it is essentially going to get 'worst' performance from having too many threads (by setting maximumPoolSize), it would stop adding new threads and work with that thread pool and start queuing, then if the queue is full it rejects.

And when load comes back down, it can start dismantling threads that are unused back to the corePoolSize.

This makes more sense to me in my application than the 'three general strategies' listed in http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html

  • Possible duplicate: http://stackoverflow.com/questions/1800317/impossible-to-make-a-cached-thread-pool-with-a-size-limit – mnicky Jun 12 '12 at 23:23

2 Answers2

3

Note: these implementations are somewhat flawed and non-deterministic. Please read the entire answer and the comments before using this code.

How about creating a work queue that rejects items while the executor is below the maximum pool size, and starts accepting them once the maximum has been reached?

This relies on the documented behavior:

"If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected."

public class ExecutorTest
{
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAXIMUM_POOL_SIZE = 4;
    private static final int KEEP_ALIVE_TIME_MS = 5000;

    public static void main(String[] args)
    {
        final SaturateExecutorBlockingQueue workQueue = 
            new SaturateExecutorBlockingQueue();

        final ThreadPoolExecutor executor = 
            new ThreadPoolExecutor(CORE_POOL_SIZE, 
                    MAXIMUM_POOL_SIZE, 
                    KEEP_ALIVE_TIME_MS, 
                    TimeUnit.MILLISECONDS, 
                    workQueue);

        workQueue.setExecutor(executor);

        for (int i = 0; i < 6; i++)
        {
            final int index = i;
            executor.submit(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }

                    System.out.println("Runnable " + index 
                            + " on thread: " + Thread.currentThread());
                }
            });
        }
    }

    public static class SaturateExecutorBlockingQueue 
        extends LinkedBlockingQueue<Runnable>
    {
        private ThreadPoolExecutor executor;

        public void setExecutor(ThreadPoolExecutor executor)
        {
            this.executor = executor;
        }

        public boolean offer(Runnable e)
        {
            if (executor.getPoolSize() < executor.getMaximumPoolSize())
            {
                return false;
            }
            return super.offer(e);
        }
    }
}

Note: Your question surprised me because I expected your desired behavior to be the default behavior of a ThreadPoolExecutor configured with a corePoolSize < maximumPoolSize. But as you point out, the JavaDoc for ThreadPoolExecutor clearly states otherwise.


Idea #2

I think I have what is probably a slightly better approach. It relies on the side-effect behavior coded into the setCorePoolSize method in ThreadPoolExecutor. The idea is to temporarily and conditionally increase the core pool size when a work item is enqueued. When increasing the core pool size, the ThreadPoolExecutor will immediately spawn enough new threads to execute all the queued (queue.size()) tasks. Then we immediately decrease the core pool size, which allows the thread pool to shrink naturally during future periods of low activity. This approach is still not fully deterministic (it is possible for the pool size to grow above max pool size, for example), but I think it is in almost all cases it is better than the first strategy.

Specifically, I believe this approach is better than the first because:

  1. It will reuse threads more often
  2. It will not reject execution as a result of a race
  3. I would like to mention again that the first approach causes the thread pool to grow to its maximum size even under very light use. This approach should be much more efficient in that regard.

-

public class ExecutorTest2
{
    private static final int KEEP_ALIVE_TIME_MS = 5000;
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAXIMUM_POOL_SIZE = 4;

    public static void main(String[] args) throws InterruptedException
    {
        final SaturateExecutorBlockingQueue workQueue = 
            new SaturateExecutorBlockingQueue(CORE_POOL_SIZE, 
                    MAXIMUM_POOL_SIZE);

        final ThreadPoolExecutor executor = 
            new ThreadPoolExecutor(CORE_POOL_SIZE, 
                    MAXIMUM_POOL_SIZE, 
                    KEEP_ALIVE_TIME_MS, 
                    TimeUnit.MILLISECONDS, 
                    workQueue);

        workQueue.setExecutor(executor);

        for (int i = 0; i < 60; i++)
        {
            final int index = i;
            executor.submit(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }

                    System.out.println("Runnable " + index 
                            + " on thread: " + Thread.currentThread()
                            + " poolSize: " + executor.getPoolSize());
                }
            });
        }

        executor.shutdown();

        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    public static class SaturateExecutorBlockingQueue 
        extends LinkedBlockingQueue<Runnable>
    {
        private final int corePoolSize;
        private final int maximumPoolSize;
        private ThreadPoolExecutor executor;

        public SaturateExecutorBlockingQueue(int corePoolSize, 
                int maximumPoolSize)
        {
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
        }

        public void setExecutor(ThreadPoolExecutor executor)
        {
            this.executor = executor;
        }

        public boolean offer(Runnable e)
        {
            if (super.offer(e) == false)
            {
                return false;
            }
            // Uncomment one or both of the below lines to increase
            // the likelyhood of the threadpool reusing an existing thread 
            // vs. spawning a new one.
            //Thread.yield();
            //Thread.sleep(0);
            int currentPoolSize = executor.getPoolSize();
            if (currentPoolSize < maximumPoolSize 
                    && currentPoolSize >= corePoolSize)
            {
                executor.setCorePoolSize(currentPoolSize + 1);
                executor.setCorePoolSize(corePoolSize);
            }
            return true;
        }
    }
}
Mike Clark
  • 10,027
  • 3
  • 40
  • 54
  • Well what you suggested is what I had said in the comment in the now deleted answer. Anyway. I think you'll find overriding offer might cause problems as it doesn't have a lock. Even if it's only 3 instructions? or so long. –  Mar 08 '12 at 19:09
  • @RonaldChan I was concerned about the same thing, and I have been trying to reason through the potential problems, but I have yet to strike on any. I'll continue to try and think through it. – Mike Clark Mar 08 '12 at 19:14
  • Accept your answer anyway. This is what I implemented in the end, rather than trying to rewrite ThreadPoolExectuor's execute method which is way more complicated. FYI I also add a buffer in maximumPoolSize to be 5% lower than the actual to allow for races. Not the ideal method but the default behaviors of ThreadPoolExecutor is odd and I can't find any external ThreadPools implementations that has the behavior I described. If there is, I'll consider changing the accepted answer. –  Mar 08 '12 at 19:17
  • 1
    @RonaldChan I think this approach is worse than we thought: not only is it potentially unreliable because of the lack of locking around the `poolSize`, it also **prevents the Executor from reusing any threads until the thread pool size is == the maximum**, even if the executor is never asked to execute more than 1 task concurrently. I am experimenting with an alternative solution that I think will be better. – Mike Clark Mar 08 '12 at 20:42
  • @RonaldChan Updated with an alternative approach at the end of the answer. At first glance it may seem "ugly", but if you look past that, I think it is better than the first approach. Let me know what you think. – Mike Clark Mar 08 '12 at 22:29
  • Good point about the Executor not reusing threads. What I've added actually addresses that issue by doing a comparison between getActiveCount and currentPoolSize. So we only force a new thread when all threads are in use. Works great for me, threads are reused like normal. –  Mar 09 '12 at 23:18
2

We found a solution to that problem with the following code :

This queue is a hybrid SynchronousQueue / LinkedBlockingQueue.

public class OverflowingSynchronousQueue<E> extends LinkedBlockingQueue<E> {
  private static final long serialVersionUID = 1L;

  private SynchronousQueue<E> synchronousQueue = new SynchronousQueue<E>();

  public OverflowingSynchronousQueue() {
    super();
  }

  public OverflowingSynchronousQueue(int capacity) {
    super(capacity);
  }

  @Override
  public boolean offer(E e) {
    // Create a new thread or wake an idled thread
    return synchronousQueue.offer(e);
  }

  public boolean offerToOverflowingQueue(E e) {
    // Add to queue
    return super.offer(e);
  }

  @Override
  public E take() throws InterruptedException {
    // Return tasks from queue, if any, without blocking
    E task = super.poll();
    if (task != null) {
      return task;
    } else {
      // Block on the SynchronousQueue take
      return synchronousQueue.take();
    }
  }

  @Override
  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // Return tasks from queue, if any, without blocking
    E task = super.poll();
    if (task != null) {
      return task;
    } else {
      // Block on the SynchronousQueue poll
      return synchronousQueue.poll(timeout, unit);
    }
  }

}

For it to work, we need to wrap the RejectedExecutionHandler to call "offerToOverflowingQueue" when a task is rejected.

public class OverflowingRejectionPolicyAdapter implements RejectedExecutionHandler {

  private OverflowingSynchronousQueue<Runnable> queue;
  private RejectedExecutionHandler adaptedRejectedExecutionHandler;

  public OverflowingRejectionPolicyAdapter(OverflowingSynchronousQueue<Runnable> queue,
                                           RejectedExecutionHandler adaptedRejectedExecutionHandler)
  {
    super();
    this.queue = queue;
    this.adaptedRejectedExecutionHandler = adaptedRejectedExecutionHandler;
  }

  @Override
  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    if (!queue.offerToOverflowingQueue(r)) {
      adaptedRejectedExecutionHandler.rejectedExecution(r, executor);
    }
  }
}

Here's how we create the ThreadPoolExecutor

public static ExecutorService newSaturatingThreadPool(int corePoolSize,
                                                        int maxPoolSize,
                                                        int maxQueueSize,
                                                        long keepAliveTime,
                                                        TimeUnit timeUnit,
                                                        String threadNamePrefix,
                                                        RejectedExecutionHandler rejectedExecutionHandler)
  {
  OverflowingSynchronousQueue<Runnable> queue = new OverflowingSynchronousQueue<Runnable>(maxQueueSize);
  OverflowingRejectionPolicyAdapter rejectionPolicyAdapter = new OverflowingRejectionPolicyAdapter(queue,
                                                                                                     rejectedExecutionHandler);
  ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,
                                                         maxPoolSize,
                                                         keepAliveTime,
                                                         timeUnit,
                                                         queue,
                                                         new NamedThreadFactory(threadNamePrefix),
                                                         rejectionPolicyAdapter);
  return executor;
}
Simon LG
  • 2,907
  • 1
  • 17
  • 16