3

I have a list of work-unit and I want to process them in parallel. Unit work is 8-15 seconds each, fully computational time, no I/O blocking. What I want to achieve is to have an ExecutorService that:

  • has zero threads instantiated when there is no work to do
  • can dynamically scale up to 20 thread if needed
  • allow me to add all work-units at once (without blocking the submission)

Something like:

Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
    service.submit(() -> {
        .. do some work ..
        queue.offer(result);
    );
}
while(queue.peek() != null) {
    ... process results while they arrive ...
}

What I tried with no success is:

  • Using a newCachedThreadPool() creates too many threads
  • Then I've used its internal call new ThreadPoolExecutor(0, 20, 60L, SECONDS, new SynchronousQueue<>()), but then I noticed that submit() is blocking due to the synchronous queue
  • So I've used new LinkedBlockingQueue(), just to find out that the ThreadPoolExecutor spawns only one thread

I'm sure there is official implementation to handle this very basic use-case of concurrency. Can someone advice?

dreamcrash
  • 47,137
  • 25
  • 94
  • 117
Jack
  • 1,488
  • 11
  • 21
  • 1
    You can use `code` ExecutorService executor = Executors.newFixedThreadPool(n); `code` where n is the number of threads you want to create – Przemek Dec 15 '20 at 11:33
  • Why insist on your first bullet item, zero threads instantiated when there is no work to do? Idle threads in Java have nearly no cost in CPU time. – Basil Bourque Dec 20 '20 at 06:40
  • @BasilBourque it alwasy depends on what kind of application you're writing. In my case the application is designed to run for months, and having the maximum control over resources is the best way to avoid possible unexpected problems: in general you can never know when you are going to need the resources you choose to waste. – Jack Dec 20 '20 at 13:09

2 Answers2

6

Create the ThreadPoolExecutor using a LinkedBlockingQueue and 20 as corePoolSize (first argument in the constructor):

new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>());


If you use the LinkedBlockingQueue without a predefined capacity, the Pool:

  • Won't ever check maxPoolSize.
  • Won't create more threads than corePoolSize's specified number.

In your case, only one thread will be executed. And you're lucky to get one, since you set it to 0 and previous versions of Java (<1.6) wouldn't create any if the corePoolSize was set to 0 (how dare they?).

Further versions do create a new thread even if the corePoolSize is 0, which seems like ... a fix that is ... a bug that... changes ... a logical behaviour?.

Thread Pool Executor

Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.)


About scaling down

In order to achieve removing all threads if there's no work to do, you will have to close the coreThreads specifically (they don't terminate by default). To achieve this, set allowCoreThreadTimeOut(true) before starting the Pool.

Be aware of setting a correct keep-alive timeout: for example, if a new task is received on average at every 6 seconds, setting the keep-alive time to 5 seconds could lead to unnecessary erase+create operations(oh dear thread, you just had to wait one second!). Set this timeout based on the task reception income speed.

allowCoreThreadTimeOut

Sets the policy governing whether core threads may time out and terminate if no tasks arrive within the keep-alive time, being replaced if needed when new tasks arrive. When false, core threads are never terminated due to lack of incoming tasks. When true, the same keep-alive policy applying to non-core threads applies also to core threads. To avoid continual thread replacement, the keep-alive time must be greater than zero when setting true. This method should in general be called before the pool is actively used.


TL/DR

  • Unbounded LinkedBloquingQueue as task queue.
  • corePoolSize replacing maxPoolSize's meaning.
  • allowCoreThreadTimeOut(true) in order to allow the Pool to scale down using a timeout based mechanism that also affects coreThreads.
  • keep-alive value set to something logical based on the task reception latency.

This fresh mix will lead to an ExecutorService that 99,99999% percent of the time won't block the submitter (for this to happen, the number of tasks queued should be 2.147.483.647), and that efficiently scales the number of threads in base of the work load, fluctuating (in both directions) between { 0 <--> corePoolSize } concurrent threads.

As a suggestion, the queue's size should be monitorized, as the non-blocking behaviour has a price: the probability of getting OOM exceptions if it keeps growing without control, until INTEGER.MAX_VALUE is met (f.e: if the threads are deadlocked for an entire day while the submitters keep inserting tasks). Even if the task's size in memory could be small, 2.147.483.647 objects with its corresponding link wrappers, etc... is a lot of extra load.

aran
  • 10,978
  • 5
  • 39
  • 69
  • Can you confirm that with this solution the pool reaches 0 threads even if the corePoolSize is set to 20? Once the thread timed-out (due to allowCoreThreadTimeOut(true)) will the pool restart a new thread when needed ? – Jack Dec 15 '20 at 14:17
  • If no tasks are submitted within a certain period of time, the threads will, one-by-one, start terminating, even if they are coreThreads. The result would be an empty pool. "When true, the same keep-alive policy applying to non-core threads applies also to core threads" – aran Dec 15 '20 at 14:18
  • And yes, if a new task is submitted, the pool will generate again a new thread. (being the maximum the number of corePoolSize). This achieves the dynamic scalling behaviour. – aran Dec 15 '20 at 14:25
  • 1
    It works, so thank you. Nevertheless I have to say that this looks a trick; I can't understand why they created such ugly interface.. what's the point of having a core/max pair if in reality max is only used to block the submission. This really it's not what most scenarios need – Jack Dec 15 '20 at 14:31
4

The simplest way would be to use the method

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

Of class Executors. This gives you a simple out-of-the-box solution. The pool that you get will expand and shrink as per need. You can further configure it with methods dealing with core threads timeout etc.
ScheduledExecutorService is an extension of ExecutorService class and is the only one that is out of the box may dynamically expand and shrink.

Michael Gantman
  • 7,315
  • 2
  • 19
  • 36
  • Cast your ScheduledExecutorService to ThreadPoolExecutor and use the method setMaximumPoolSize – Michael Gantman Dec 15 '20 at 13:30
  • Just tested. Only one thread created. I believe ScheduledExecutorService exports interface to schedule task in the future, but scaling mechanism is probably shared with the newCachedThreadPool() – Jack Dec 15 '20 at 13:48
  • Try to use prestartAllCoreThreads() method of ThreadPoolExecutor – Michael Gantman Dec 15 '20 at 15:09
  • @BasilBourque - Scheduled version is an extension of ExecutorService class and is the only one that is out of the box may dynamically expand and shrink. So, while in this question Scheduled capabilities are not needed the dynamic change of thread pool size is – Michael Gantman Dec 20 '20 at 08:57
  • @BasilBourque - Thanks for the suggestion. I just edited the answer and added that info – Michael Gantman Dec 20 '20 at 09:28