2

I am looking for an ExecutorService that creates threads on demand up to a predefined limit and destroys idle threads after a keep alive time.

The following constructor creates a ThreadPoolExecutor with fixed thread count:

// taken from Executors.newFixedThreadPool()
new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

So I tried to create a ExecutorService this way:

// taken from Executors.newCachedThreadPool()
new ThreadPoolExecutor(0, nThreads,
    CACHED_POOL_SHUTDOWN_DELAY, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());

But it doesn't work as expected, when nThreads are in use, the Executor does not enqueue new tasks but throws an RejectedExecutionException. I know I could implement a handler for that, but it didn't help me.

How can I create the Executor described ahead?

Stephan
  • 4,395
  • 3
  • 26
  • 49

2 Answers2

2

If a new task can't be queued, a new thread is created, unless you have already reached the maximum core pool size. In your case, the queue can only contain one task at a time so if you submit tasks quickly enough you reach the maximum pool size and get the exception.

It works with CachedThreadPool because the maximum core pool size is large (Integer.MAX_VALUE).

You need to use a different queue, for example a new LinkedBlockingQueue like in the the fixed threadpool example.

Side note: checking the implementation documentation can help understand the details. In particular, the execute method of the ThreadPoolExecutor class has:

   /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
assylias
  • 321,522
  • 82
  • 660
  • 783
  • Thanks, you're right. But it may happen that thousands of tasks where enqueued and I don't want to create such a big queue. – Stephan Jul 30 '13 at 06:00
  • @Stephan You need to do something with the task that can't be run (max pool size is used) and can't be enqueued (queue is full)... You can restrict the size of the queue but you will receive an exception once it is full. – assylias Jul 30 '13 at 06:01
  • 1
    That's true again. I removed the `capacity` parameter from the constructor of the queue, so it is set to `Integer.MAX_VALUE`. – Stephan Jul 30 '13 at 06:06
2

I found an approach on that post that does exactly what I need.
@assylias I recognized your answer and changed the queue implementation.

Now my code looks like this:

parallelExecutor = new ThreadPoolExecutor(nThreads, nThreads,
    CACHED_POOL_SHUTDOWN_DELAY, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());

parallelExecutor.allowCoreThreadTimeOut(true);   // this is the magic

It works like a fixed thread pool but these core thats are allowed to time out.

Community
  • 1
  • 1
Stephan
  • 4,395
  • 3
  • 26
  • 49