25

I have tried creating and executing ThreadPoolExecutor with

int poolSize = 2;
int maxPoolSize = 3;
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

If i try 7th,8th... task continuously

  threadPool.execute(task);  

after the queue reached maximum size
it is start throwing "RejectedExecutionException". Means i lost of adding those tasks.

Here then what is the role of BlockingQueue if it is missing the tasks? Means why it is not waiting?

From the definition of BlockingQueue

A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.


Why cant we go for linkedlist (normal queue implementation instead of blocking queue)?

Kanagavelu Sugumar
  • 18,766
  • 20
  • 94
  • 101

6 Answers6

20

The problem occurs because you're task queue is too small and this is indicated by the documentation of the execute method:

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

So the first problem is that you're setting your queue size to a very small number:

int poolSize = 2;
int maxPoolSize = 3;
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

And then you state "If [I] try 7th, 8th... task" then you would get a RejectedExecutionException because you're past the capacity of the queue. There are two ways to resolve your problem (I would recommend doing both):

  1. Increase the size of the queue.
  2. Catch the exception and re-try adding the task.

You should have something along the lines of this:

public void ExecuteTask(MyRunnableTask task) {
    bool taskAdded = false;
    while(!taskAdded) {
        try {
            executor.execute(task);
            taskAdded = true;
        } catch (RejectedExecutionException ex) {
            taskAdded = false;
        }
    }   
}

Now, to address your other questions...

Here then what is the role of BlockingQueue if it is missing the tasks?

The role of the BlockingQueue is to complete the Producer/Consumer pattern and if it's large enough, then you shouldn't see the issues you're encountering. As I mentioned above, you need to increase the queue size and catch the exception then retry executing the task.

Why cant we go for linkedlist?

A linked list is neither thread safe, nor is it blocking. The Producer/Consumer pattern tends to work best with a blocking queue.

Update

Please don't be offended by the following statements, I'm intentionally using more stringent language in order to put emphasis on the fact that your first assumption should never be that there is something wrong with the library you're using (unless you wrote the library yourself and you know that there is a specific problem in it)!

So let's put this concern to rest right now: neither the ThreadPoolExecutor nor the Java library are the problem here. It's entirely your (mis)use of the library that's causing the problem. Javmex has a great tutorial explaining the exact situation you're seeing.

There could be several reasons why you're filling up the queue faster than you're emptying it:

  1. The thread that's adding tasks for executing is adding them too fast.
  2. The tasks are taking too long to execute.
  3. Your queue is too small.
  4. Any combination of the above 3.

There are a bunch of other reasons too, but I think the above would be the most common.

I would give you a simple solution with an unbounded queue, but it would NOT resolve your (mis)use of the library. So before we go blaming the Java library, let's see a concise example that demonstrates the exact problem you're encountering.

Update 2.0

Here are a couple of other questions addressing the specific problem:

  1. ThreadPoolExecutor Block When Queue Is Full?
  2. How to make ThreadPoolExecutor's submit() method block if it is saturated?
Community
  • 1
  • 1
Kiril
  • 39,672
  • 31
  • 167
  • 226
  • 8
    I believe producer consumer pattern has to be taken care by ThreadPoolExecutor API, Since it has the responsible of adding the task into Blocking queue. If the Queue is full It should wait till queue is emptied then it should add the task. But Instead of waiting it is throwing exception that task can't be added. So ThreadPoolExecutor is violating the producer/consumer design while using the Blocking queue. Please clarrify me. – Kanagavelu Sugumar Sep 27 '11 at 13:56
  • Even if i increase the queue size to large no; when tasks are submitted with more than this szie && the queue is full; i believe it will loose the tasks. So how can i avoid this? Do i need to go for unbounded blocking queue? – Kanagavelu Sugumar Sep 27 '11 at 13:59
  • @Kanagavelu, one solution is to catch the exception and re-add the task. However, without seeing a code example that would demonstrate the issue, it would be hard to comment any further. Additionally, how long does each task run for? How many threads are adding tasks for execution? How often are these tasks added for execution? – Kiril Sep 27 '11 at 14:29
  • 1
    The problem is just to understand why they are choosing a blocking queue instead of an only queue. i just run a simple example to ensure when queue is full does API is waiting to add the additional tasks or not. Then i found that is not. I think this is called [link]http://en.wikipedia.org/wiki/Producer-consumer_problem" `Producer-consumer_problem` happening here. – Kanagavelu Sugumar Sep 28 '11 at 11:45
  • 2
    @Kanagavelu, I'm not 100% sure why they don't just block on the `put` when the queue is full, but I'm pretty sure they block on the `take`. Blocking on `take` ensures that threads don't have to use any sort of polling to check if the queue has data (which is considered an inefficient approach), instead the worker thread waits/blocks until it's signaled that there is something in the queue so it can dequeue it. The other reason why they use a blocking queue is because it's thread safe, thus requires no synchronization on the user's part when invoking `put`/`take`. – Kiril Sep 28 '11 at 13:53
  • I've updated my answer (Update 2.0) with a couple of links that might shed some more light on the problem. – Kiril Sep 28 '11 at 14:01
16

The blocking queue is mainly for the Consumers (threads in the pool). The threads can wait for new tasks to become available on the queue, they will be automatically woken up. A plain linked list would not serve that purpose.

On the producer side the default behavior is to throw an exception in case the queue is full. This can be easily customized by implementing your own RejectedExceptionHandler. In your handler you can get hold of the queue and call the put method that will block till more space becomes available.

But this is not a good thing to do - the reason is that if there is a problem in this executor (deadlock , slow processing) it would cause a ripple effect on the rest of the system. For example if you are calling the execute method from a servlet - if the execute method blocks then all the containers threads will be held up and your application will come to a halt. That is probably the reason why the default behavior is to throw an exception and not to wait. Also there is no implementation of the RejectedExceptionHandler that does this - to discourage people from using it.

There is an option (CallersRunPolicy) to execute in the calling thread which can be another option if you want the processing to happen.

The general rule is - it is better to fail processing of one request, rather than bring the whole system down. You might want to read about the circuit-breaker pattern.

gkamal
  • 20,777
  • 4
  • 60
  • 57
  • that was helpful, especially you example with Servlet. As far as I understand this implementation of Consumer-Producer patter was made like this to add flexibility, which made this executors universal tool, but in this popular situation when you need to block both consumer and producer you had to write some code (classic Java) – mulya Apr 20 '17 at 11:40
  • It's worth noting that the BlockingQueue's default behavior is to block on a put when the queue is full, but when using a ThreadPoolExecutor, it'll default to not use this blocking functionality and fail-fast by throwing ```RejectedExecutionException``` instead. – Siddhartha Jun 19 '20 at 05:38
2

You're not using BlockingQueue in the way it's intended to be used.

A BlockingQueue is used to implement the producer consumer pattern. Producer thread(s) put items on the queue via the blocking put() method, while consumer thread(s) take items from the queue via the blocking take() method.

put() blocks - meaning if the queue is full, it waits until a consumer has taken an item from the queue before adding it, then returns.

take() blocks - meaning if the queue is empty, it waits until a producer has put an item on the queue before taking it and returning it.

This pattern completely disconnects the producers from consumers, except that they share the queue.

Try using the queue like that: Have an executor run some threads that act as producers and some that act as consumers.

Bohemian
  • 412,405
  • 93
  • 575
  • 722
  • 4
    I'm not sure why you think he's not using the `BlockingQueue` in the way it's intended to be used, unless you think that the designers of Java's concurrency library are not using the queue in the way it's intended to be used. I could be horribly misunderstanding your answer, but I'm pretty sure that the blocking queue is intended to be used like that with the `ThreadPoolExecutor`. – Kiril Sep 26 '11 at 15:39
  • Sure, but he's not implementing the consumer side or putting work on the queue - he's simply submitting the task. Of course the queue will fill up - nothing is emptying it. – Bohemian Sep 26 '11 at 21:30
  • 3
    Calling `execute` (i.e. submitting the task) on the `TheradPoolExecutor` will place the task in the blocking queue and once a thread from the executor is available, it will dequeue the task from the blocking queue and execute it. If the tasks are not too long and there are enough threads in the thread pool, then the queue will be emptied eventually. The producer/consumer design patter is implemented inside the `ThreadPoolExecutor` itself. – Kiril Sep 26 '11 at 21:59
  • Agree with Lirik, I believe producer consumer pattern has to be taken care by ThreadPoolExecutor API, Since it has the responsible of adding the task into Blocking queue. If the Queue is full It should wait till queue is emptied then it should add the task. But Instead of waiting it is throwing exception that task can't be added. So ThreadPoolExecutor is violating the producer/consumer design while using the Blocking queue. Please clarrify me. – Kanagavelu Sugumar Sep 27 '11 at 13:53
  • @Kanagavelu, you can rest assure that **the `ThreadPoolExecutor` is NOT violating the producer/consumer design pattern!** It should not wait for the queue to be empty before adding more tasks, it just needs to wait until more room is available for adding more tasks (if it needs to wait at all). The problem you're encountering is not at all the fault of the `ThreadPoolExecutor` library, but entirely in the way you're using it. See this for more details: http://www.javamex.com/tutorials/threads/thread_pools_queues.shtml – Kiril Sep 27 '11 at 14:52
  • @Lirik Thanks for the links and explanations. But i still not able to understand why blocking queue is needed here. Since as per producer consumer pattern Queue should use blocked put()/take() method to ensure no data lost by throwing IllegalStateException (while using add()/Remove() in queue). If we are going to get RejectedExecutionException this same behaviour of using simple queue with add()/remove() and IllegalStateException. No role Blocking functionality. Thanks a lot for your time. – Kanagavelu Sugumar Sep 28 '11 at 11:39
  • The problem is that threaded pool executors use the offer method of Queue, as opposed to the put method of BlockingQueue, losing any benefit of using a blocking queue in the first place. This appears to be a fundamental flaw in the API. – Perception Dec 18 '12 at 03:33
  • 1
    @Bohemian Your point of view is how people expect Executor+BlockingQueue is working. But in practice it doesn't work like this and this is why this question was asked. – mulya Apr 20 '17 at 11:54
1

The problem occurs because you're task queue is too small

IMHO this doesn't answers the OP question (although Kiril gives more detail after that) because the size of a queue being too small is totally subjective. For example he may be protecting an external resource that can't handle more than two concurrent requests (besides that I think 2 was for make a quick test). Also, what size we could say that is not to small? 1000? What if the execution tries to execute 5000 tasks? The scenario remains because the true question is why the procuder or caller thread is not being blocked if the ThreadPoolExecutor uses a LinkedBlockingQueue?

The role of the BlockingQueue is to complete the Producer/Consumer pattern and if it's large enough, then you shouldn't see the issues you're encountering. As I mentioned above, you need to increase the queue size and catch the exception then retry executing the task.

This is true if you know the maximum number of tasks that could be queued in runtime and you can allocate enough heap for that, but this isn't always the case. The benefit of using bounded queues is that you cant protect yourself against OutOfMemoryError.

The correct answer should be the one from @gkamal.

The blocking queue is mainly for the Consumers (threads in the pool). The threads can wait for new tasks to become available on the queue, they will be automatically woken up. A plain linked list would not serve that purpose.

My two cents:

Developers could have decided that consumer won't block neither and in that case a BlockingQueue will no longer needed (although you still need to use a concurrent collection or handle synchronization manually), but in this case when the worker tries to poll a task from a empty collection you must

  1. Kill the thread (an handle new task creating fresh threads), or
  2. Do a busy waiting wasting cpu

Since creating new threads is an expensive task (that is way we are using thread pools after all) and do a busy waiting to, the best option is to block that consumer and notify later when a task is available.

Actually, there is a special case where workers won't block (at least not indefinitely), when there are more threads than the configured corePoolSize.

 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.

We can see these two scenarios in ThreadPoolExecutor (java 1.8)

/**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // <---- wait at most keepAliveTime
                    workQueue.take(); // <---- if there are no tasks, it awaits on notEmpty.await();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
gabrielgiussi
  • 9,245
  • 7
  • 41
  • 71
0

From your setting, the request maybe rejected if concurrent more than 5. Because the sequence order will be pool size -> queue -> max pool size.

  1. Inital two threads will be created
  2. Afterward if request more than 2 are coming, the subsequence request will be put into queue.
  3. If the queue are full(Which you are set 2), new thread will be created but will not exceed max pool size(Which you are set 3).
  4. If more request are coming, and all thread/worker are busy and queue are full then request will be rejected(Subject to your rejectPolicy config).

More detail can read from here: https://dzone.com/articles/scalable-java-thread-pool-executor

Steve Chew
  • 11
  • 2
0

Your question is perfectly legitimate and in this case you should ignore "go read the docs" or other superior comments you received.

Here then what is the role of BlockingQueue if it is missing the tasks? Means why it is not waiting?

As the 2nd voted answer (not the accepted one) tells you, the use of a BlockingQueue is consistent with the need of consumers to block for work to be available (items in the queue). The blocking nature of the queue is seemingly not used on the enqueue side, and you are right, this is unintuitive and AFAIK undocumented.

A legitimate use-case for the Executor is to have a fixed number of threads (e.g. 1 or 2 or as many as the number of cores of your machine), and yet never drop incoming work items, AND don't accumulate them in the queue. This use-case would be solved with a fixed number of core threads, and a bounded blocking queue that blocks when full. Thereby the system putting backpressure onto upstream systems. This is a perfectly reasonable design decision.

haelix
  • 4,245
  • 4
  • 34
  • 56
  • You wrote, "don't accumulate them in the queue...bounded blocking queue that blocks when full." It seems like you _do_ want to accumulate items in the queue, so perhaps the first statement should be clarified or removed? – Luke Dec 28 '22 at 23:17
  • @Luke, no, I wrote what I meant. I am tallking about the situation where the user wishes a blocking enqueue of tasks into the blocking queue. I.e. for the queue to be blocking on the inbound. It therefore, does accumulate items but once full, no longer does that as there is no space. It starts to block. – haelix Jan 07 '23 at 16:30