21

The Sun Java (1.6) ScheduledThreadPoolExecutor which is an extension of ThreadPoolExecutor internally uses an implementation of DelayQueue which is an unbounded queue. What I need is a ScheduledThreadpoolExecutor with a bounded queue i.e. it has a limit on the tasks accumulating in the queue so that when tasks in queue exceed the limit, it starts rejecting the further submitted tasks and prevents JVM going out of memory.

Surprisingly, google or stackoverflow did not point me to any results which are discussing this problem. Is there any such thing already available I am missing out? If not, how can I implement a ScheduledThreadpoolExecutor to provide me my expected functionality in a best way?

Gopi
  • 10,073
  • 4
  • 31
  • 45
  • 2
    Alas, you may have to reimplement `ScheduledThreadPoolExecutor` yourself, as I do not see a way for you to customise your work queue, nor a way to replace it once the executor is created. :-( – C. K. Young Sep 13 '11 at 14:51
  • Thanks Chris for your input! That is exactly what I was trying to avoid :( – Gopi Sep 14 '11 at 04:54
  • You can also try using `Semaphores` to control the number of jobs submitted. Take a look at my updated post for more details. – Sanjay T. Sharma Sep 14 '11 at 06:51

5 Answers5

7

As others have already pointed out, there isn't a ready way of doing this. Just make sure you try to use "composition" instead of "inheritance". Create a new class which implements the necessary interface and delegate to the underlying ScheduledThreadPoolExecutor by doing checks as per required on the necessary methods.

You can also use the technique specified in this thread with a simple modification. Instead of using Semaphore#acquire, you can use Semaphore#tryAcquire and depending on the boolean outcome decide whether you need to call the rejection handler or not. Come to think of it, I personally feel that it was an oversight on the part of the library authors to directly subclass a specific executor rather than relying on composition to create a "schedulable" wrapper over a normal executor.

Community
  • 1
  • 1
Sanjay T. Sharma
  • 22,857
  • 4
  • 59
  • 71
2

How about handling it differently i.e. depending upon the queue size delay the task subsmission. The executor services exposes the queue via getQueue(). You can invoke the size() on it and depending upon the limit you plan for the queue size, you can either start rejecting the tasks or start delaying the task execution (increase the scheduled time keeping the size of the queue as one of the factor).

All said, this is again not the best solution; just fyi, java provides delay queue to support work stealing.

Scorpion
  • 3,938
  • 24
  • 37
  • Yes this is an approach to handle it, but again as you yourself pointed out, its not the best one. The biggest problem is that the tasks rejected this way will have to be handled separately rather than seamlessly being taken care by the RejectedExecutionHandler. – Gopi Sep 14 '11 at 05:52
  • you can replicate that sort of behavior, looks like Java guys with all the work stealing and multiple cores and cheap memory thought it would be really difficult to crash the JVM (OOME) because of the work queue. How do you handle OOME sort of situation for the rest of the application? fyi, You can use jmx listeners and observe your memory withing the application. – Scorpion Sep 14 '11 at 05:58
1

ScheduledThreadPoolExecutor does not use queue as field but instead calls getQueue. But it calls super.getQueue which is queue from ThreadPoolExecutor. You can use reflection to override it like this:

public class BoundedScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
  public BoundedScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler, int queueCapacity) {
    super(corePoolSize, handler);
    setMaximumPoolSize(corePoolSize);
    setKeepAliveTime(0, TimeUnit.MILLISECONDS);
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueCapacity) {
            @Override
            public boolean add(Runnable r) {
                boolean added = offer(r);
                if (added) {
                    return added;
                } else {
                    getRejectedExecutionHandler().rejectedExecution(r, CrashingThreadPoolExecutor.this);
                    return false;
                }
            }
        };

    try {
        Field workQueueField = ThreadPoolExecutor.class.getDeclaredField("workQueue");
        workQueueField.setAccessible(true);
        workQueueField.set(this, queue);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
  }
}
logcat
  • 3,435
  • 1
  • 29
  • 44
  • 1
    While this is a nice hack to enforce a bound on the work queue of a `ScheduledThreadPoolExecutor`, it has a certain caveat that is sorta show-stopper for the approach: `workQueue` field is expected to deliver a certain order for the scheduled tasks. (See `DelayedWorkQueue` inner class javadoc in `ScheduledThreadPoolExecutor`.) Hence, replacing it blindly with another `BlockingQueue` renders its entire *time-sensitive scheduling* logic meaningless. – Volkan Yazıcı Dec 16 '18 at 11:54
1

The simplest workaround is to use scheduled executor to schedule tasks only, not to actually execute them. Scheduler have to explicitly check executor queue size and discard task if executor queue is above a threshold.

Another option is to check ScheduledThreadPoolExecutor queue size right in scheduled task. If the queue is above threshold, just return immediately. In this case the task will be executed instantly and removed from queue. So overflow won't happen.

0

If you really, really don't want to re-implement ScheduledThreadPoolExecutor then you could extend it and override all the schedule* methods and implement your own bounding of tasks. It would be fairly nasty though:

private final Object scheduleMonitor = new Object();

@Override
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) 
{
    if (command == null || unit == null)
        throw new NullPointerException();

    synchronized (scheduleMonitor)
    {                
        while (getQueue().size() >= MAX_QUEUE_SIZE)
        {
           scheduleMonitor.wait();
        }
        super.schedule(command, delay, unit);
    }
}

@Override
Runnable getTask() 
{
    final Runnable r = getTask();
    synchronized (scheduleMonitor)
    {
        scheduleMonitor.notify();
    }
    return r;
}

And repeat for:

  • public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
  • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
  • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

Note, this won't stop repeating tasks from taking the queue over a limit, it will only block newly scheduled tasks.

Another caveat is that I haven't checked for any deadlock issues by calling super.schedule while holding the lock on scheduleMonitor...

SimonC
  • 6,590
  • 1
  • 23
  • 40
  • Thanks for your input, but as you pointed out this will just hold the task from being queued. That does not solve the problem of preventing OOM – Gopi Sep 14 '11 at 05:55
  • Well, using a bounded queue wouldn't prevent your OOM either as it would block while full as well. If you want to avoid even scheduling a task if the queue is full couldn't you just use the standard `ScheduledThreadPoolExecutor` and check getQueue().size() before calling schedule? – SimonC Sep 14 '11 at 06:00
  • Actually, I take that back, I hadn't realised that `ThreadPoolExecutor` calls `offer` and rejects the task if the work queue is full. – SimonC Sep 14 '11 at 06:03