5

First of all: I already read the following two questions and their possible solutions:

The dilemma I'm having is that I want to use a custom BlockingQueue or rather a different but specific queue, namely PriorityBlockingQueue with a custom Comparator which sorts the queue by priority.

The ThreadPoolExecutor does support custom queues in its constructors, but it does not implement the methods from the ScheduledExecutorService interface. So I went and found the subclass ScheduledThreadPoolExecutor, but it does not support custom queues and uses a DelayedWorkQueue instead.

Problems:

  • I cannot extend from ScheduledThreadPoolExecutor because creating constructors for my own class won't do anything since the constructors of the ScheduledThreadPoolExecutor don't accept custom queues as a parameter.
  • I cannot copy the contents of the ThreadPoolExecutor class and the implementations of the ScheduledThreadPoolExecutor because it uses a lot of methods which are declared with no modifiers (e. g. canRunInCurrentState(boolean periodic) and all method being invoked by this call) which does not allow me to access the method since even though its a subclass of ThreadPoolExecutor, it is not in the same package.

My current implementation looks like this:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.croemheld.tasks.PriorityTaskComparator;

public class ScheduledPriorityThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {

    private static final int INITIAL_QUEUE_SIZE = 10;

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()));
    }

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), handler);
    }

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), threadFactory);
    }

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), threadFactory, handler);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

}

As you can see, the constructors problem is solved, but it still leaves the implementations of the scheduling methods from ScheduledExecutorService.

So I'm asking you, is there any way to maybe pass a Comparator to the queue or a simple and not too exhaustive way to create an own executor class which implements the methods from ScheduledExecutorService and offers the methods from the ThreadPoolExecutor class as well as uses a PriorityBlockingQueue?

xingbin
  • 27,410
  • 9
  • 53
  • 103
CRoemheld
  • 889
  • 7
  • 26
  • I'm not sure I understand what you're after. You're focused on the methods that `ScheduledExecutorService` adds, but it's not clear to me how you expect those to interact with a queue or `Comparator`. Perhaps it would help if you described how you hope to *use* the thing you're looking for. – John Bollinger Jan 14 '18 at 00:36
  • @JohnBollinger long story short: I want a class which provides all methods from `ScheduledThreadPoolExecutor` but instead of using the internal `DelayedWorkQueue` I need a queue which sorts the tasks inside the queue by priority. Since the `Comparator` used in the `DelayedWorkQueue` sorts `ScheduledFutureTask` objects by their time and/or sequence number (see `ScheduledFutureTask#compareTo(Delayed other)`). I have a custom task class and it works with a `ThreadPoolExecutor`, but when e. g. I want to run a task periodically, I need the `ScheduledExecutorService`s methods. – CRoemheld Jan 14 '18 at 00:58
  • Aside from the impl details, what's your goal here? Implement your own task priority? Can you not achieve that using separate executors? – Abhijit Sarkar Jan 14 '18 at 01:52
  • @AbhijitSarkar No the tasks are already implemented. All of the tasks I put into the `ThreadPoolExecutor` are extending `FutureTask` and have an additional field called `priority`. I also have a `PriorityTaskComparator` which extends `Comparator` just as in the first link (https://stackoverflow.com/a/16577568/3741284). The only problem is that the answer uses a `ThreadPoolExecutor` which does not implement functionality for periodic task scheduling. Hence this question about either implementing `ScheduledExecutorService` or whatever would help me to achieve my goal. – CRoemheld Jan 14 '18 at 02:15
  • @AbhijitSarkar Part 2 of the comment above: Could I achieve this by using two executors? I don't want to initialize multiple instances of `ExecutorService ` or `ThreadPoolExecutor` just because I can't initialize an executor which provides support for scheduling and executing tasks immediately as well as executing a task periodically. `ThreadPoolExecutor` supports immediately executing and a queue which sorts by priority, but not periodically executing. `ScheduledThreadPoolExecutor` supports executing periodically but not sorting the queue by priority. – CRoemheld Jan 14 '18 at 02:22
  • So if I understand you, you're trying to jam these two very different approaches to task scheduling together in one class simply because you don't want to maintain different `ExecutorService` instances to serve these different scenarios. I don't find that reasoning very persuasive, and I really don't think you've thought this through from a *functional* perspective. The Java standard library does not provide a pre-built way to do this, nor pieces that you can easily put together to make it happen, because it does not make sense. – John Bollinger Jan 14 '18 at 03:32

3 Answers3

2

If I understood your question, you want to execute some tasks periodically, but according to some custom priority. Short of inventing your own ExecutorService, I suggest taking a step back and looking at your design. You may want to separate scheduling from prioritization and task execution:

  1. Since ThreadPoolExecutor accepts a custom BlockingQueue, you can easily implement your own prioritization. Then simply periodically submit the tasks from elsewhere in the code.
  2. If you insist on using ScheduledThreadPoolExecutor, then you get scheduling, but you'll have to implement prioritization yourself. You can get very creative with that, but one option might be to have an orchestration task that picks up tasks from a custom BlockingQueue and submits to the pool.
Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219
  • Your first point sounds like a good idea, but how can I be notified when a task inside the `ThreadPoolExecutor` is finished? (Other than using a `CompletableFuture` with `thenAccept`). Your second point I don't quite get: What exactly would this task do? As this would be an orchestration task, did you mean I should let run this task repeatedly and `poll` the other tasks from a custom queue to insert them into the `ScheduledThreadPoolExecutor`? – CRoemheld Jan 14 '18 at 10:06
  • @CRoemheld there was nothing in your question about notification, so that’s new. How do you wanna be notified and why? Your understanding about #2 is correct. – Abhijit Sarkar Jan 14 '18 at 10:14
  • I thought you would use a kind of notification to know when to re-insert a finished task. Or how would you periodically insert a task? Note that the `ScheduledThreadPoolExecutor` also supports delayed periodic tasks, such as `scheduleAtFixedRate` and `scheduleWithFixedRate` (from the `ScheduledExecutorService` interface). – CRoemheld Jan 14 '18 at 10:18
  • @CRoemheld You’d submit a new task that runs the same `Runnable/Callable`. There’s nothing in `ScheduledExecutorService` that waits for previous jobs to finish. It simply helps you run something periodically. Any kind of coordination would have to be done by you. – Abhijit Sarkar Jan 14 '18 at 10:40
1

I wrote a simple, clean, working solution for ThreadPoolExecutor with PriorityBlockingQueue.

public class PriorityQueueThreadPoolExecutor {
    private static final int DEFAULT_INITIAL_PRIORITY_QUEUE_CAPACITY = 100;
    private static final long THREAD_TIMEOUT_IN_SECS = 60L;
    public static final int DEFAULT_PRIORITY = 0;

    private static final AtomicInteger InstanceCounter = new AtomicInteger(0);

    private final ThreadPoolExecutor internalExecutor;

    public PriorityQueueThreadPoolExecutor(int threadPoolSize, String threadNamePrefix) {
        internalExecutor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, THREAD_TIMEOUT_IN_SECS,
                TimeUnit.SECONDS, createPriorityQueue(), createThreadFactory(threadNamePrefix));
        internalExecutor.allowCoreThreadTimeOut(true);
    }

    public void submit(Runnable runnable, int priority) {
        internalExecutor.execute(new RunnableWithPriority(runnable, priority));
    }

    public void submit(Runnable runnable) {
        submit(runnable, DEFAULT_PRIORITY);
    }

    public ThreadPoolExecutor getInternalThreadPoolExecutor() {
        return internalExecutor;
    }

    private static BlockingQueue<Runnable> createPriorityQueue() {
        return new PriorityBlockingQueue<>(DEFAULT_INITIAL_PRIORITY_QUEUE_CAPACITY,
                new ComparatorForPriorityRunnable());
    }

    private static ThreadFactory createThreadFactory(String threadNamePrefix) {
        return new ThreadFactoryBuilder().setThreadFactory(Executors.defaultThreadFactory())
                .setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build();
    }

    private static class RunnableWithPriority implements Runnable {
        final int creationOrder;
        final int priority;
        final Runnable runnable;

        public RunnableWithPriority(Runnable runnable, int priority) {
            this.runnable = runnable;
            this.priority = priority;
            this.creationOrder = InstanceCounter.incrementAndGet();
        }

        @Override
        public void run() {
            runnable.run();
        }
    }

    private static class ComparatorForPriorityRunnable implements Comparator<Runnable> {
        @Override
        public int compare(Runnable r1, Runnable r2) {
            RunnableWithPriority pr1 = (RunnableWithPriority) r1;
            RunnableWithPriority pr2 = (RunnableWithPriority) r2;
            // higher value means higher priority
            int priorityResult = pr2.priority - pr1.priority;
            return priorityResult != 0 ? priorityResult : (pr1.creationOrder - pr2.creationOrder);
        }
    }
}
SUBHAS ROY
  • 11
  • 1
0

I looked for other possible solutions and I came to the following result:

Since a ThreadPoolExecutor manages a pool of multiple Threads (i. e., if you set two or more threads in the Executors.newFixedThreadPool(int nThreads) method), and if you really want to mix a priority based BlockingQueue with it, then I would suggest do the following:

  • Create an own ThreadPoolExecutor class similar to the one above, using a PriorityBlockingQueue with a custom comparator.
  • Create your own Task class (or FutureTask extension, whatever you think is the best for you)
  • These task classes are for one-shot tasks, meaning they just run once.

For loop-tasks which should run in the background periodically, I came up with a simple class for this purpose:

public abstract class AbstractThread extends Thread {

    protected Runnable runnable;

    protected AbstractThread(String name, Runnable runnable) {
        super(runnable, name);

        this.runnable = runnable;
    }

    /**
     * This method provides a way to perform some action before the thread is actually starting.
     */
    protected abstract void beforeExecution();

    /**
     * This method provides a way to perform some action after the thread finished.
     */
    protected abstract void afterExecution();

    @Override
    public void run() {
        try {
            doRun();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * Run the given runnable here.
     * 
     * @throws InterruptedException 
     */
    protected abstract void doRun() throws InterruptedException;

}

While simple one-shot threads just run the runnable once:

@Override
protected void doRun() {
    beforeExecution();

    runnable.run();

    afterExecution();
}

Periodic tasks in a thread would just do something like:

@Override
protected void doRun() throws InterruptedException {
    beforeExecution();

    while(!isInterrupted()) {
        runnable.run();
        Thread.sleep(millis);
    }

    afterExecution();
}

If you want to support periodic tasks which run once in a while, you could either pass a delay parameter to the Thread instance or you just write something like Thread.sleep(delay) in your runnable.

This is no actual code, just a suggestion as I'm trying to work with this now.

CRoemheld
  • 889
  • 7
  • 26