19

I need to implement a thread pool in Java (java.util.concurrent) whose number of threads is at some minimum value when idle, grows up to an upper bound (but never further) when jobs are submitted into it faster than they finish executing, and shrinks back to the lower bound when all jobs are done and no more jobs are submitted.

How would you implement something like that? I imagine that this would be a fairly common usage scenario, but apparently the java.util.concurrent.Executors factory methods can only create fixed-size pools and pools that grow unboundedly when many jobs are submitted. The ThreadPoolExecutor class provides corePoolSize and maximumPoolSize parameters, but its documentation seems to imply that the only way to ever have more than corePoolSize threads at the same time is to use a bounded job queue, in which case, if you've reached maximumPoolSize threads, you'll get job rejections which you have to deal with yourself? I came up with this:

//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(minSize));
...

//submitting jobs
for (Runnable job : ...) {
    while (true) {
        try {
            pool.submit(job);
            System.out.println("Job " + job + ": submitted");
            break;
        } catch (RejectedExecutionException e) {
            // maxSize jobs executing concurrently atm.; re-submit new job after short wait
            System.out.println("Job " + job + ": rejected...");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e1) {
            }
        }
    }
}

Am I overlooking something? Is there a better way to do this? Also, depending on one's requirements, it might be problematic that the above code will not finish until at least (I think) (total number of jobs) - maxSize jobs have finished. So if you want to be able to submit an arbitrary number of jobs into the pool and proceed immediately without waiting for any of them to finish, I don't see how you could do that without having a dedicated "job sumitting" thread that manages the required unbounded queue to hold all the submitted jobs. AFAICS, if you're using an unbounded queue for the ThreadPoolExecutor itself, its thread count will never grow beyond corePoolSize.

Gray
  • 115,027
  • 24
  • 293
  • 354
Olaf Klischat
  • 727
  • 1
  • 7
  • 14
  • 3
    I have to admit, I fail to see the usefulness of a dynamically sized threadpool. Does your number of processors on your board change during the uptime of your application? – corsiKa Jun 28 '12 at 16:51
  • 3
    Why is `newCachedThreadPool` not suitable for your situation? It automatically kills off threads that are not used anymore. – Tudor Jun 28 '12 at 16:53
  • What would have if your idle threads did not die? Say you had a fixed size pool of the maximum size all the time? WHat would happen? – Peter Lawrey Jun 28 '12 at 17:01
  • 1
    AFAICS, newCachedThreadPool creates a pool whose number of threads may grow without limits if you submit many long-running jobs into it. – Olaf Klischat Jun 28 '12 at 17:03
  • 5
    No, the processor count won't change at runtime, but I fail to see how this is relevant if the jobs are largely I/O bound rather than CPU bound. In that case you will achieve increased job throughput by using many threads even on a single-processor system. – Olaf Klischat Jun 28 '12 at 17:05

3 Answers3

14

When growing and shrinking comes together with thread, there is only one name which comes to my mind: CachedThreadPool from java.util.concurrent package.

ExecutorService executor = Executors.newCachedThreadPool();

CachedThreadPool() can reuse the thread, as well as create new threads when needed. And yes, if a thread is idle for 60 seconds, CachedThreadPool will kill it. So this is quite lightweight – growing and shrinking in your words!

Community
  • 1
  • 1
Kumar Vivek Mitra
  • 33,294
  • 6
  • 48
  • 75
8

One trick that might help you is to assign a RejectedExecutionHandler that uses the same thread to submit the job into the blocking queue. That will block the current thread and remove the need for some sort of loop.

See my answer here:

How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

Here's the rejection handler copied from that answer.

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will call the rejected
// handler when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
   }
});

You should then be able to make use of the core/max thread count as long as you realize that the bounded blocking queue that you use first fills up before any threads are created above the core threads. So if you have 10 core threads and you want the 11th job to start the 11th thread you will need to have a blocking queue with a size of 0 unfortunately (maybe a SynchronousQueue). I feel that this is a real limitation in the otherwise great ExecutorService classes.

Community
  • 1
  • 1
Gray
  • 115,027
  • 24
  • 293
  • 354
  • Bummer. I'm here trying to find a way to make the pool size grow as the queue fills rather than waiting until full. Ah well. – zerpsed Jul 15 '19 at 21:52
  • 1
    You can certainly do that @zerpsed. You can adjust the number of core threads based on the number of items in the queue. – Gray Jul 15 '19 at 22:10
  • Well, I'm using a LinkedBlockingQueue and was hoping to keep the number of idle threads low. Increasing the core size will increase the number of idle threads I hold or I may be miss-understanding information. SynchronousQueue performs poorly for me under load, I think due to the 1:1 task to available thread it seems to enforce. It looks like I can monitor queue size and adjust core num up under load and down when queue is empty. I had hopped the ThreadPoolExecutor would do this for me, but new threads are not created beyond the core size unless the queue is full. – zerpsed Jul 16 '19 at 14:41
  • You should look at my answer here @zerpsed. https://stackoverflow.com/questions/19528304/how-to-get-the-threadpoolexecutor-to-increase-threads-to-max-before-queueing That question (and my answer) allow you to scale up threads before the queue is full. That might be all you need. – Gray Jul 16 '19 at 21:27
  • Thanks for the share and very cool. I built some logic, before coming back here, to scale threads based on queue size. So far, it is performing well under load. – zerpsed Jul 17 '19 at 15:32
1

Set maximumPoolSize to Integer.MAX_VALUE. If you ever have more than 2 billion threads...well, good luck with that.

Anyway, the Javadoc of ThreadPoolExecutor states:

By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).

With a similarly unbounded task queue like a LinkedBlockingQueue, this should have arbitrarily large capacity.

Louis Wasserman
  • 191,574
  • 25
  • 345
  • 413
  • 1
    Thanks also refer to this http://stackoverflow.com/questions/28567238/threadpoolexecutor-does-not-shrink-properly/40384042#40384042 shrinking hit other issues resolved in other question – V H Nov 02 '16 at 15:44