143

It seems to be impossible to make a cached thread pool with a limit to the number of threads that it can create.

Here is how static Executors.newCachedThreadPool is implemented in the standard Java library:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

So, using that template to go on to create a fixed sized cached thread pool:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Now if you use this and submit 3 tasks, everything will be fine. Submitting any further tasks will result in rejected execution exceptions.

Trying this:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

Will result in all threads executing sequentially. I.e., the thread pool will never make more than one thread to handle your tasks.

This is a bug in the execute method of ThreadPoolExecutor? Or maybe this is intentional? Or there is some other way?

Edit: I want something exactly like the cached thread pool (it creates threads on demand and then kills them after some timeout) but with a limit on the number of threads that it can create and the ability to continue to queue additional tasks once it has hit its thread limit. According to sjlee's response this is impossible. Looking at the execute() method of ThreadPoolExecutor it is indeed impossible. I would need to subclass ThreadPoolExecutor and override execute() somewhat like SwingWorker does, but what SwingWorker does in its execute() is a complete hack.

AlexElin
  • 1,044
  • 14
  • 23
Matt Wonlaw
  • 12,146
  • 5
  • 42
  • 44
  • 1
    What is your question? Isn't your 2nd code example the answer to your title? – rsp Nov 25 '09 at 22:23
  • 6
    I want a thread pool that will add threads on demand as the number of tasks grow, but will never add more than some max number of threads. CachedThreadPool already does this, except it will add an unlimited number of threads and not stop at some pre-defined size. The size I define in the examples is 3. The second example adds 1 thread, but doesn't add two more as new tasks arrive while the other tasks have not yet completed. – Matt Wonlaw Nov 26 '09 at 10:16
  • Check this, it solves it, http://debuggingisfun.blogspot.com/2012/05/java-cachedthreadpool-with-min-and-max.html – ethan May 10 '12 at 04:28
  • Related to: https://stackoverflow.com/questions/19528304/how-to-get-the-threadpoolexecutor-to-increase-threads-to-max-before-queueing/19528305#19528305 – Gray Aug 21 '17 at 13:33

14 Answers14

244

The ThreadPoolExecutor has the following several key behaviors, and your problems can be explained by these behaviors.

When tasks are submitted,

  1. If the thread pool has not reached the core size, it creates new threads.
  2. If the core size has been reached and there is no idle threads, it queues tasks.
  3. If the core size has been reached, there is no idle threads, and the queue becomes full, it creates new threads (until it reaches the max size).
  4. If the max size has been reached, there is no idle threads, and the queue becomes full, the rejection policy kicks in.

In the first example, note that the SynchronousQueue has essentially size of 0. Therefore, the moment you reach the max size (3), the rejection policy kicks in (#4).

In the second example, the queue of choice is a LinkedBlockingQueue which has an unlimited size. Therefore, you get stuck with behavior #2.

You cannot really tinker much with the cached type or the fixed type, as their behavior is almost completely determined.

If you want to have a bounded and dynamic thread pool, you need to use a positive core size and max size combined with a queue of a finite size. For example,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

Addendum: this is a fairly old answer, and it appears that JDK changed its behavior when it comes to core size of 0. Since JDK 1.6, if the core size is 0 and the pool does not have any threads, the ThreadPoolExecutor will add a thread to execute that task. Therefore, the core size of 0 is an exception to the rule above. Thanks Steve for bringing that to my attention.

AlexElin
  • 1,044
  • 14
  • 23
sjlee
  • 7,726
  • 2
  • 29
  • 37
  • 6
    You must write few words about method `allowCoreThreadTimeOut` to make this answer perfect. See the answer of @user1046052 – hsestupin Mar 18 '13 at 12:00
  • 1
    Great answer! Just one point to add: Other rejection policies are also worth mentioning. See the answer of @brianegge – Jeff Apr 15 '14 at 01:54
  • 1
    Shouldn't behaviour 2 say *'If the **maxThread** size has been reached and there is no idle threads, it queues tasks.'*? – Zoltán Jun 30 '16 at 15:02
  • 1
    Could you elaborate on what the size of the queue implies? Does it mean that only 20 tasks can be queued before they are rejected? – Zoltán Jul 01 '16 at 08:33
  • 1
    @Zoltán I wrote this a while ago, so there is a chance some behavior might have changed since then (I didn't follow the latest activities too closely), but assuming these behavior are unchanged, #2 is correct as stated, and that's perhaps the most important (and somewhat surprising) point of this. Once the core size is reached, TPE favors queueing over creating new threads. The queue size is literally the size of the queue that's passed to the TPE. If the queue becomes full but it hasn't reached the max size, it will create a new thread (not reject tasks). See #3. Hope that helps. – sjlee Jul 01 '16 at 16:22
  • Notice that this thread pool will have max 10 threads and only grow above it up to 50 after the queue is full. – Tvaroh Dec 18 '17 at 12:07
  • @sjlee So, could you explain this strange behavior in this `ThreadExecutor` [program](https://stackoverflow.com/questions/52253164/threadpoolexecutor-with-corepoolsize-0-should-not-execute-tasks-until-task-queue). It is not following the 4 cases you mentioned here. – Steve Sep 13 '18 at 13:43
64

Unless I've missed something, the solution to the original question is simple. The following code implements the desired behavior as described by the original poster. It will spawn up to 5 threads to work on an unbounded queue and idle threads will terminate after 60 seconds.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);
  • 1
    You are correct. That method was added in jdk 1.6, so not as many people know about it. also, you can't have a "min" core pool size, which is unfortunate. – jtahlborn Nov 14 '11 at 17:57
  • 4
    My only worry about this is (from the JDK 8 docs): "When a new task is submitted in method execute(Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle." – veegee Feb 12 '16 at 05:50
  • Pretty sure this doesn't actually work. Last time I looked doing the above actually only ever runs your work in one thread even though you spawn 5. Again, been a few years but when I dove into the implementation of ThreadPoolExecutor it only dispatched to new threads once your queue was full. Using an unbounded queue causes this to never happen. You can test by submitting work and loggin'g the thread name then sleeping. Every runnable will end up printing the same name / not be run on any other thread. – Matt Wonlaw Mar 30 '16 at 20:22
  • 2
    This does work, Matt. You set the core size to 0, that's why you only had 1 thread. The trick here is to set the core size to the max size. – T-Gergely Apr 19 '16 at 12:18
  • 1
    @vegee is right - This doesn't actually work very well - ThreadPoolExecutor will only re-use threads when above corePoolSize. So when corePoolSize is equal to maxPoolSize, you'll only benefit from the thread caching when your pool is full (So if you intend to use this but usually stay under your max pool size, you might as well reduce the thread timeout to a low value; and be aware that there's no caching - always new threads) – Chris Riddell Oct 22 '16 at 01:42
  • @Riddell is correct. It does not behave like a CachedThreadPool with a thread limit and an unbounded queue because it does not re-use threads. A ThreadPoolExecutor can offer two major benefits: 1) reduce the overhead of executing task by re-using threads and 2) limit the number of resources (e.g. threads) used. This solution does not offer the former. – Stefan Feuerhahn Aug 06 '19 at 08:21
8

Had same issue. Since no other answer puts all issues together, I'm adding mine:

It is now clearly written in docs: If you use a queue that does not blocks (LinkedBlockingQueue) max threads setting has no effect, only core threads are used.

so:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

This executor has:

  1. No concept of max threads as we are using an unbounded queue. This is a good thing because such queue may cause executor to create massive number of non-core, extra threads if it follows its usual policy.

  2. A queue of max size Integer.MAX_VALUE. Submit() will throw RejectedExecutionException if number of pending tasks exceeds Integer.MAX_VALUE. Not sure we will run out of memory first or this will happen.

  3. Has 4 core threads possible. Idle core threads automatically exit if idle for 5 seconds.So, yes, strictly on demand threads.Number can be varied using setThreads() method.

  4. Makes sure min number of core threads is never less than one, or else submit() will reject every task. Since core threads need to be >= max threads the method setThreads() sets max threads as well, though max thread setting is useless for an unbounded queue.

S.D.
  • 29,290
  • 3
  • 79
  • 130
  • I think you also need to set 'allowCoreThreadTimeOut' to 'true', otherwise, once the threads are created, you will keep them around forever: https://gist.github.com/ericdcobb/46b817b384f5ca9d5f5d – eric Aug 22 '14 at 15:11
  • oops I just missed that, sorry, your answer is perfect then! – eric Aug 25 '14 at 16:55
7

In your first example, subsequent tasks are rejected because the AbortPolicy is the default RejectedExecutionHandler. The ThreadPoolExecutor contains the following policies, which you can change via the setRejectedExecutionHandler method:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

It sounds like you want cached thread pool with a CallerRunsPolicy.

brianegge
  • 29,240
  • 13
  • 74
  • 99
5

None of the answers here fixed my problem, which had to do with creating a limited amount of HTTP connections using Apache's HTTP client (3.x version). Since it took me some hours to figure out a good setup, I'll share:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

This creates a ThreadPoolExecutor which starts with five and holds a maximum of ten simultaneously running threads using CallerRunsPolicy for executing.

Pops
  • 30,199
  • 37
  • 136
  • 151
paul
  • 51
  • 1
  • 1
  • The problem with this solution is that if you increase the number or producers, you will increase the number of threads running the background threads. In many cases that's not what you want. – Gray Aug 21 '17 at 13:25
3

Per the Javadoc for ThreadPoolExecutor:

If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool.

(Emphasis mine.)

jitter's answer is what you want, although mine answers your other question. :)

Jonathan Feinberg
  • 44,698
  • 7
  • 80
  • 103
2

This is what you want (atleast I guess so). For an explanation check Jonathan Feinberg answer

Executors.newFixedThreadPool(int n)

Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

Community
  • 1
  • 1
jitter
  • 53,475
  • 11
  • 111
  • 124
  • 6
    Sure, I could use a fixed thread pool but that would leave n threads around for forever, or until I call shutdown. I want something exactly like the cached thread pool (it creates threads on demand and then kills them after some timeout) but with a limit on the number of threads that it can create. – Matt Wonlaw Nov 26 '09 at 10:43
2

there is one more option. Instead of using new SynchronousQueue you can use any other queue also, but you have to make sure its size is 1, so that will force executorservice to create new thread.

Ashkrit Sharma
  • 627
  • 5
  • 7
Ashkrit
  • 29
  • 1
  • I think you mean size 0 (by default), so that there will be no task queued and truly force executorservice to create new thread every time. – Leonmax Nov 07 '14 at 19:38
2

Doesn't look as though any of the answers actually answer the question - in fact I can't see a way of doing this - even if you subclass from PooledExecutorService since many of the methods/properties are private e.g. making addIfUnderMaximumPoolSize was protected you could do the following:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

The closest I got was this - but even that isn't a very good solution

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

p.s. not tested the above

Stuart
  • 41
  • 2
2

Here is another solution. I think this solution behaves as you want it to (though not proud of this solution):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
Stuart
  • 41
  • 2
1

The problem was summarized as follows:

I want something exactly like the cached thread pool (it creates threads on demand and then kills them after some timeout) but with a limit on the number of threads that it can create and the ability to continue to queue additional tasks once it has hit its thread limit.

Before pointing to the solution I will explain why the following solutions don't work:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

This will not queue any tasks when the limit of 3 is reached because SynchronousQueue, by definition, cannot hold any elements.

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

This will not create more than a single thread because ThreadPoolExecutor only creates threads exceeding the corePoolSize if the queue is full. But LinkedBlockingQueue is never full.

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);

This will not reuse threads until the corePoolSize has been reached because ThreadPoolExecutor increases the number of threads until the corePoolSize is reached even if existing threads are idle. If you can live with this disadvantage then this is the easiest solution to the problem. It is also the solution described in "Java Concurrency in Practice" (footnote on p172).

The only complete solution to the described problem seems to be the one involving overriding the queue's offer method and writing a RejectedExecutionHandler as explained in the answers to this question: How to get the ThreadPoolExecutor to increase threads to max before queueing?

Stefan Feuerhahn
  • 1,564
  • 1
  • 14
  • 22
1

This works for Java8+ (and other, for now..)

     Executor executor = new ThreadPoolExecutor(3, 3, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>()){{allowCoreThreadTimeOut(true);}};

where 3 is the limit of threads count, and 5 is timeout for idle threads.

If you want to check if it works yourself, here is the code to do the job:

public static void main(String[] args) throws InterruptedException {
    final int DESIRED_NUMBER_OF_THREADS=3; // limit of number of Threads for the task at a time
    final int DESIRED_THREAD_IDLE_DEATH_TIMEOUT=5; //any idle Thread ends if it remains idle for X seconds

    System.out.println( java.lang.Thread.activeCount() + " threads");
    Executor executor = new ThreadPoolExecutor(DESIRED_NUMBER_OF_THREADS, DESIRED_NUMBER_OF_THREADS, DESIRED_THREAD_IDLE_DEATH_TIMEOUT, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()) {{allowCoreThreadTimeOut(true);}};

    System.out.println(java.lang.Thread.activeCount() + " threads");

    for (int i = 0; i < 5; i++) {
        final int fi = i;
        executor.execute(() -> waitsout("starting hard thread computation " + fi, "hard thread computation done " + fi,2000));
    }
    System.out.println("If this is UP, it works");

    while (true) {
        System.out.println(
                java.lang.Thread.activeCount() + " threads");
        Thread.sleep(700);
    }

}

static void waitsout(String pre, String post, int timeout) {
    try {
        System.out.println(pre);
        Thread.sleep(timeout);
        System.out.println(post);
    } catch (Exception e) {
    }
}

output of the code above for me is

1 threads
1 threads
If this is UP, it works
starting hard thread computation 0
4 threads
starting hard thread computation 2
starting hard thread computation 1
4 threads
4 threads
hard thread computation done 2
hard thread computation done 0
hard thread computation done 1
starting hard thread computation 3
starting hard thread computation 4
4 threads
4 threads
4 threads
hard thread computation done 3
hard thread computation done 4
4 threads
4 threads
4 threads
4 threads
3 threads
3 threads
3 threads
1 threads
1 threads
Ondřej
  • 21
  • 5
0
  1. You can use ThreadPoolExecutor as suggested by @sjlee

    You can control the size of the pool dynamically. Have a look at this question for more details :

    Dynamic Thread Pool

    OR

  2. You can use newWorkStealingPool API, which has been introduced with java 8.

    public static ExecutorService newWorkStealingPool()
    

    Creates a work-stealing thread pool using all available processors as its target parallelism level.

By default, the parallelism level is set to number of CPU cores in your server. If you have 4 core CPU server, thread pool size would be 4. This API returns ForkJoinPool type of ExecutorService and allow work stealing of idle threads by stealing tasks from busy threads in ForkJoinPool.

Community
  • 1
  • 1
Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
0

I Recommend using Signal approach

from SignalExecutors class:

ThreadPoolExecutor will only create a new thread if the provided queue returns false from offer(). That means if you give it an unbounded queue, it'll only ever create 1 thread, no matter how long the queue gets. But if you bound the queue and submit more runnables than there are threads, your task is rejected and throws an exception. So we make a queue that will always return false if it's non-empty to ensure new threads get created. Then, if a task gets rejected, we simply add it to the queue.

    public static ExecutorService newCachedBoundedExecutor(final String name, int minThreads, int maxThreads) {
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(minThreads,
            maxThreads,
            30,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>() {
                @Override
                public boolean offer(Runnable runnable) {
                    if (size() > 1 && size() <= maxThreads) {
                        //create new thread
                        return false;
                    } else {
                        return super.offer(runnable);
                    }
                }
            }, new NumberedThreadFactory(name));

    threadPool.setRejectedExecutionHandler((runnable, executor) -> {
        try {
            executor.getQueue().put(runnable);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });

    return threadPool;
}
Sepehr
  • 960
  • 11
  • 17