36

I am using ExecutorService for ease of concurrent multithreaded program. Take following code:

while(xxx) {
    ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
    ...  
    Future<..> ... = exService.submit(..);
    ...
}

In my case the problem is that submit() is not blocking if all NUMBER_THREADS are occupied. The consequence is that the Task queue is getting flooded by many tasks. The consequence of this is, that shutting down the execution service with ExecutorService.shutdown() takes ages (ExecutorService.isTerminated() will be false for long time). Reason is that the task queue is still quite full.

For now my workaround is to work with semaphores to disallow to have to many entries inside the task queue of ExecutorService:

...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);

while(xxx) {
    ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
    ...
    semaphore.aquire();  
    // internally the task calls a finish callback, which invokes semaphore.release()
    // -> now another task is added to queue
    Future<..> ... = exService.submit(..); 
    ...
}

I am sure there is a better more encapsulated solution?

HugoTeixeira
  • 4,674
  • 3
  • 22
  • 32
manuel aldana
  • 15,650
  • 9
  • 43
  • 50

6 Answers6

35

The trick is to use a fixed queue size and:

new ThreadPoolExecutor.CallerRunsPolicy()

I also recommend using Guava's ListeningExecutorService. Here is an example consumer/producer queues.

private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  5000L, TimeUnit.MILLISECONDS,
                                  new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}

Anything better and you might want to consider a MQ like RabbitMQ or ActiveMQ as they have QoS technology.

emanciperingsivraren
  • 1,215
  • 2
  • 15
  • 27
Adam Gent
  • 47,843
  • 23
  • 153
  • 203
6

You can call ThreadPoolExecutor.getQueue().size() to find out the size of the waiting queue. You can take an action if the queue is too long. I suggest running the task in the current thread if the queue is too long to slow down the producer (if that is appropriate).

HugoTeixeira
  • 4,674
  • 3
  • 22
  • 32
Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
6

A true blocking ThreadPoolExecutor has been on the wishlist of many, there's even a JDC bug opened on it. I'm facing the same problem, and came across this: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

It's an implementation of a BlockingThreadPoolExecutor, implemented using a RejectionPolicy that uses offer to add the task to the queue, waiting for the queue to have room. It looks good.

mdma
  • 56,943
  • 12
  • 94
  • 128
  • [This answer](http://stackoverflow.com/a/4522411/394431) to another question suggests using a custom `BlockingQueue` subclass which blocks on `offer()` by delegating to `put()`. I think that ends up working more or less the same as this `RejectedExecutionHandler`. – Robert Tupelo-Schneck Mar 29 '14 at 20:55
4

You're better off creating the ThreadPoolExecutor yourself (which is what Executors.newXXX() does anyway).

In the constructor, you can pass in a BlockingQueue for the Executor to use as its task queue. If you pass in a size constrained BlockingQueue (like LinkedBlockingQueue), it should achieve the effect you want.

ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize));
Kevin
  • 30,111
  • 9
  • 76
  • 83
  • i see. i overlooked a detail at http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool%28int%29. there it is mentioned, that as standard an unbounded queue is used. – manuel aldana Feb 11 '10 at 21:37
  • 8
    i tried it out. unfortunately your solution does not block (as I want) but throws a RejectedExecutionException. also found: http://www.velocityreviews.com/forums/t389526-threadpoolexecutor-with-blocking-execute.html. the presented workarounds seem to be more complicated as my semaphore example, damn! – manuel aldana Feb 11 '10 at 23:53
  • 4
    this dont working because of RejectedExecutionException if queue is full – terentev May 21 '12 at 12:51
  • Downvoted because it does not address the point in the question. – acorello May 01 '14 at 15:25
  • Downvoted too, it doesnt block but throw a rejected – zeraDev Jun 10 '20 at 16:17
2

you can add another bloquing queue that's has limited size to controle the size of internal queue in executorService, some thinks like semaphore but very easy. before executor you put() and whene the task achive take(). take() must be inside the task code

bilal
  • 21
  • 1
1

I know this is too old but might be useful for other developers. So submitting one of the solution.

As you asked for better encapsulated solution. It is done by extending ThreadPoolExecutor and overriding submit method.

BoundedThreadpoolExecutor implemented using Semaphore. Java executor service throws RejectedExecutionException when the task queue becomes full. Using unbounded queue may result in out of memory error. This can be avoided by controlling the number of tasks being submitted using executor service. This can be done by using semaphore or by implementing RejectedExecutionHandler.

PrasadB
  • 91
  • 1
  • 7