1

For my use case, I need an executor which can execute tasks based on priority. Simple way to achieve this is by using a thread pool with PriorityBlockingQueue and override newTaskFor() to return custom future tasks that are comparable based on priority of the task.

//Define priorities
public enum Priority {
    HIGH, MEDIUM, LOW, VERYLOW;
}

A priority task

//A Callable tasks that has priority. Concrete implementation will implement 
//call() to do actual work and getPriority() to return priority
public abstract class PriorityTask<V> implements Callable<V> {
    public abstract Priority getPriority ();
}

Actual executor implementation

public class PriorityTaskThreadPoolExecutor <V> {
    int _poolSize;
    private PriorityBlockingQueue<Runnable> _poolQueue = 
                                       new PriorityBlockingQueue<Runnable>(500); 
    private ThreadPoolExecutor _pool;

    public PriorityTaskThreadPoolExecutor (int poolSize) {
        _poolSize = poolSize;

        _pool = new ThreadPoolExecutor(_poolSize, _poolSize, 5, TimeUnit.MINUTES, 
                                      _poolQueue) {
                        //Override newTaskFor() to return wrap PriorityTask 
                        //with a PriorityFutureTaskWrapper.
                        @Override
                        protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
                            return new PriorityFutureTaskWrapper<V>((PriorityTask<V>) c);
                        }
                };

        _pool.allowCoreThreadTimeOut(true);
    }

    public Future<V> submit (PriorityTask<V> task) {
        return _pool.submit(task);
    }

}

//A future task that wraps around the priority task to be used in the queue
class PriorityFutureTaskWrapper<V> extends FutureTask<V> 
                             implements Comparable <PriorityFutureTaskWrapper<V>> {
    PriorityTask<V> _priorityTask;

    public PriorityFutureTaskWrapper (PriorityTask<V> priorityTask) {
        super(priorityTask);
        _priorityTask = priorityTask;
    }

    public PriorityTask<V> getPriorityTask () {
        return _priorityTask;
    }

    @Override
    public int compareTo(PriorityFutureTaskWrapper<V> o) {
        return _priorityTask.getPriority().ordinal() - 
               o.getPriorityTask().getPriority().ordinal();
    }
}

Problem with this is, in my usecase, there is a potential that low priority tasks may starve forever. I want to avoid this. I could not find a clean way to do this using the executors/pools available in java. So I am thinking of writing my own executor. I have two different approaches.

1) A custom thread pool with PriorityBlockingQueue. There will be a separate thread, checking the tasks age in the queue. Older tasks will be removed and re-added with escalated priority.

2) My use case will have only a limited number of priorities say 1-4. I will have 4 different queues for each priority. Now the threads in the custom pool, instead of blocking on the queues, will scan the queues in the following order when it has to pick up the next task.

40% threads - Q1, Q2, Q3, Q4

30% threads - Q2, Q1, Q3, Q4

20% threads - Q3, Q1, Q2, Q4

10% threads - Q4, Q1, Q2, Q3

Scanning will be done by a thread when it is notified about a new addition in the queue or when the current task executed by that thread is completed. Other times, threads will be waiting. But still, scanning will be little more inefficient compared to blocking on the queue.

Apprach 2 is more suitable for my usecase.

Has any one tried any of these approaches or a different approach for similar usecase? Any thought/suggestions?

Rajesh Jose
  • 314
  • 2
  • 12
  • 1
    Couldn't you just alter your existing implementation of `PriorityTask` to add escalation? OR: In your "compareTo" add a means to take into account the age of the task. So your "effective" Priority is not only the value of priority field but also the age of the task. – Fildor Aug 15 '16 at 10:38
  • To escalate, PriorityTask has to get CPU cycles...So one of the thread or a separate thread has to check and escalate it - Periodically or basd on some trigger. Also remove it from queue and re-add it after the escalation so that it is inserted at a position based on new priority. That is the 1st approach I mentioned in my post. But I am not very convinced about that approach... – Rajesh Jose Aug 15 '16 at 10:51
  • Another method I have once used was that we implemented a "planned drain". That is: Once a day when the probability of low load was high, we took the whole Queue and drained it to another queue which was handled on a separate SingleThreadPoolExecutor. – Fildor Aug 15 '16 at 10:51
  • Side note on code quality: what is the point of "_" on all your variable names? And: dont put more information into comments; please update your question instead. – GhostCat Aug 15 '16 at 10:54
  • @Rajesh Ah, right. It would have to be reinserted to change Prio ... did not think of that. – Fildor Aug 15 '16 at 10:54
  • @GhostCat I just have that habit of using "_" that for member variables to differentiate that from the ones passed as args or local with similar names. Ignore that. I copy pasted code from my source code, so it came as is. – Rajesh Jose Aug 15 '16 at 10:59
  • @Fildor This is what I though of doing with approach-1. Either one thread periodically iterate the queue and escalate for old tasks. Or PriorityTask with low priorities can register with a timer for call back after elapsing a specific time say callback-time=current-time+timeout-interval. Timer has one sorted array sorted based on "callback-time". One timer thread pickup up first one in the list and do object.wait(callback-time - current-time). Once that comes out of wait, it will do a call back on that PriorityTask and task itself can escalate, remove itself from queue and re-add.... – Rajesh Jose Aug 15 '16 at 11:10
  • @Fildor And every time a new registration for call back happens thread will be notified so it again go back and wait for first one's time out. Some how did not like either of this. Let me see what feed backs i get. Worst case I will go with approach-2 mentioned in the post...Lot of changes for that, but that suites my use case better. – Rajesh Jose Aug 15 '16 at 11:12
  • @Fildor, the time-sensitive comparator sounds like a smart solution to the problem, but I would not trust any 3rd-party sorted container to do the right thing if the total order can change while the container is not empty. I'm saying, I would write my own priority queue if I decided to go that route. – Solomon Slow Aug 15 '16 at 13:06
  • What do you really mean when you say, "priority"? (you can't eliminate the risk of starvation with _true_ priorities.) Why do you think you need four distinct priority levels? What is the real problem that you think you can solve by prioritizing the tasks? – Solomon Slow Aug 15 '16 at 13:14
  • @jameslarge Yes, you are right. I also totally forgot, that entries won't be rearranged ... so a custom implementation would most probably be necessary. In that case, I'd probably look for another solution, first. As I said: In one instance it was enough to drain the queue once a day. That was easy, readable, predictable and very stable. – Fildor Aug 15 '16 at 13:50

1 Answers1

1

There is no easy way to change priority of an already inserted element in the PriorityQueue , It is already discussed here

Your second option should be simple to implement , like process more tasks from high priority queue than from low priority queue

You can also consider having different ThreadPools for each priority and number of threads in each pool depends on priority of the tasks.

Community
  • 1
  • 1
ravthiru
  • 8,878
  • 2
  • 43
  • 52