Note: these implementations are somewhat flawed and non-deterministic. Please read the entire answer and the comments before using this code.
How about creating a work queue that rejects items while the executor is below the maximum pool size, and starts accepting them once the maximum has been reached?
This relies on the documented behavior:
"If a request cannot be queued, a new thread is created unless this
would exceed maximumPoolSize, in which case, the task will be
rejected."
public class ExecutorTest
{
private static final int CORE_POOL_SIZE = 2;
private static final int MAXIMUM_POOL_SIZE = 4;
private static final int KEEP_ALIVE_TIME_MS = 5000;
public static void main(String[] args)
{
final SaturateExecutorBlockingQueue workQueue =
new SaturateExecutorBlockingQueue();
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
workQueue);
workQueue.setExecutor(executor);
for (int i = 0; i < 6; i++)
{
final int index = i;
executor.submit(new Runnable()
{
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Runnable " + index
+ " on thread: " + Thread.currentThread());
}
});
}
}
public static class SaturateExecutorBlockingQueue
extends LinkedBlockingQueue<Runnable>
{
private ThreadPoolExecutor executor;
public void setExecutor(ThreadPoolExecutor executor)
{
this.executor = executor;
}
public boolean offer(Runnable e)
{
if (executor.getPoolSize() < executor.getMaximumPoolSize())
{
return false;
}
return super.offer(e);
}
}
}
Note: Your question surprised me because I expected your desired behavior to be the default behavior of a ThreadPoolExecutor configured with a corePoolSize < maximumPoolSize. But as you point out, the JavaDoc for ThreadPoolExecutor clearly states otherwise.
Idea #2
I think I have what is probably a slightly better approach. It relies on the side-effect behavior coded into the setCorePoolSize
method in ThreadPoolExecutor
. The idea is to temporarily and conditionally increase the core pool size when a work item is enqueued. When increasing the core pool size, the ThreadPoolExecutor
will immediately spawn enough new threads to execute all the queued (queue.size()) tasks. Then we immediately decrease the core pool size, which allows the thread pool to shrink naturally during future periods of low activity. This approach is still not fully deterministic (it is possible for the pool size to grow above max pool size, for example), but I think it is in almost all cases it is better than the first strategy.
Specifically, I believe this approach is better than the first because:
- It will reuse threads more often
- It will not reject execution as a result of a race
- I would like to mention again that the first approach causes the thread pool to grow to its maximum size even under very light use. This approach should be much more efficient in that regard.
-
public class ExecutorTest2
{
private static final int KEEP_ALIVE_TIME_MS = 5000;
private static final int CORE_POOL_SIZE = 2;
private static final int MAXIMUM_POOL_SIZE = 4;
public static void main(String[] args) throws InterruptedException
{
final SaturateExecutorBlockingQueue workQueue =
new SaturateExecutorBlockingQueue(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE);
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
workQueue);
workQueue.setExecutor(executor);
for (int i = 0; i < 60; i++)
{
final int index = i;
executor.submit(new Runnable()
{
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Runnable " + index
+ " on thread: " + Thread.currentThread()
+ " poolSize: " + executor.getPoolSize());
}
});
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
public static class SaturateExecutorBlockingQueue
extends LinkedBlockingQueue<Runnable>
{
private final int corePoolSize;
private final int maximumPoolSize;
private ThreadPoolExecutor executor;
public SaturateExecutorBlockingQueue(int corePoolSize,
int maximumPoolSize)
{
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
}
public void setExecutor(ThreadPoolExecutor executor)
{
this.executor = executor;
}
public boolean offer(Runnable e)
{
if (super.offer(e) == false)
{
return false;
}
// Uncomment one or both of the below lines to increase
// the likelyhood of the threadpool reusing an existing thread
// vs. spawning a new one.
//Thread.yield();
//Thread.sleep(0);
int currentPoolSize = executor.getPoolSize();
if (currentPoolSize < maximumPoolSize
&& currentPoolSize >= corePoolSize)
{
executor.setCorePoolSize(currentPoolSize + 1);
executor.setCorePoolSize(corePoolSize);
}
return true;
}
}
}