43

Why, oh why doesn't java.util.concurrent provide a queue length indicators for its ExecutorServices? Recently I found myself doing something like this:

ExecutorService queue = Executors.newSingleThreadExecutor();
AtomicInteger queueLength = new AtomicInteger();
...

public void addTaskToQueue(Runnable runnable) {
    if (queueLength.get() < MAX_QUEUE_LENGTH) {
        queueLength.incrementAndGet(); // Increment queue when submitting task.
        queue.submit(new Runnable() {
            public void run() {
                runnable.run();
                queueLength.decrementAndGet(); // Decrement queue when task done.
            }
        });
    } else {
        // Trigger error: too long queue
    }
}

Which works ok, but... I think this really should be implemented as a part of the ExecutorService. It's dumb and error prone to carry around a counter separated from the actual queue, whose length the counter is supposed to indicate (reminds me of C arrays). But, ExecutorServices are obtained via static factory methods, so there's no way to simply extend the otherwise excellent single thread executor and add a queue counter. So what should I do:

  1. Reinvent stuff already implemented in JDK?
  2. Other clever solution?
Joonas Pulakka
  • 36,252
  • 29
  • 106
  • 169

2 Answers2

69

There is a more direct way:

ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );
// add jobs
// ...
int size = executor.getQueue().size();

This is directly copied from Executors.newSingleThreadExecutor in JDK 1.6. The LinkedBlockingQueue that is passed to the constructor is actually the very object that you will get back from getQueue.

Matthew
  • 10,361
  • 5
  • 42
  • 54
x4u
  • 13,877
  • 6
  • 48
  • 58
  • Ha! So you just need to re-implement very tiny part (one line) of the JDK. Quite acceptable :-) Thanks for this. – Joonas Pulakka Feb 15 '10 at 14:20
  • 2
    Actually I'd go with the cast myself because ThreadPoolExecutor is a public class and thus in some way also part of the public API of newSingleThreadExecutor. I think it is less likely that the object created by newSingleThreadExecutor will not be a ThreadPoolExecutor anymore than that the arguments of the constructor invocation in newSingleThreadExecutor might change some day, i.e. to use some more efficent or more precise implementation or whatever. – x4u Feb 15 '10 at 14:29
  • 15
    Ironically, on one machine I tried this I got an `FinalizableDelegatedExecutorService` instead of an `ThreadPoolExecutor`. – Simon Forsberg Dec 10 '12 at 14:16
  • 3
    Looking at [the source](http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/concurrent/Executors.java#Executors.newSingleThreadExecutor%28%29), I don't see how you can cast an `ExecutorService` directly to a `ThreadPoolExecutor`. Also I keep getting a `ClassCastException`, like always. – dbm Dec 21 '15 at 09:42
4

While you can check the queue size directly. Another way of dealing with a queue that's getting too long is making the internal queue bounded.

public static
ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                              5000L, TimeUnit.MILLISECONDS,
                              new ArrayBlockingQueue<Runnable>(queueSize, true));
}

This will cause RejectedExecutionExceptions (see this) when you exceed the limit.

If you want to avoid the exception the calling thread can be hijacked to execute the function. See this SO question for a solution.

References

Community
  • 1
  • 1
Alexander Oh
  • 24,223
  • 14
  • 73
  • 76