5

I realized my ThreadPoolExecutor with PriorityBlockingQueue like in this example: https://stackoverflow.com/a/12722648/2206775

and wrote a test:

PriorityExecutor executorService = (PriorityExecutor)  PriorityExecutor.newFixedThreadPool(16);
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 1);

    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 3);

    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 2);

    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("5");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 5);

    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("4");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 4);

    executorService.shutdown();
    try {
        executorService.awaitTermination(30, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

But in the end, I don't get 1 2 3 4 5, I get a random order of those numbers. Is there a problem with the test, or something else? And if first, how can it be tested correctly?

Community
  • 1
  • 1
Dmitry Sobetsky
  • 367
  • 1
  • 3
  • 13

2 Answers2

12

The priority is only taken into account if the pool is fully busy and you submit several new tasks. If you define your pool with only one thread, you should get the expected output. In your example, all tasks get executed concurrently and which one finishes first is somewhat random.

By the way the linked implementation has a problem and throws an exception if your queue is full and you submit new tasks.

See below a working example of what you are trying to achieve (I have overriden newTaskFor in a simplistic way, just to make it work - you might want to improve that part).

It prints: 1 2 3 4 5.

public class Test {

    public static void main(String[] args) {
        PriorityExecutor executorService = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(1);
        executorService.submit(getRunnable("1"), 1);
        executorService.submit(getRunnable("3"), 3);
        executorService.submit(getRunnable("2"), 2);
        executorService.submit(getRunnable("5"), 5);
        executorService.submit(getRunnable("4"), 4);

        executorService.shutdown();
        try {
            executorService.awaitTermination(30, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static Runnable getRunnable(final String id) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(id);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
    }

    static class PriorityExecutor extends ThreadPoolExecutor {

        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                                long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        //Utitlity method to create thread pool easily

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new PriorityExecutor(nThreads, nThreads, 0L,
                                        TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>());
        }
        //Submit with New comparable task

        public Future<?> submit(Runnable task, int priority) {
            return super.submit(new ComparableFutureTask(task, null, priority));
        }
        //execute with New comparable task

        public void execute(Runnable command, int priority) {
            super.execute(new ComparableFutureTask(command, null, priority));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return (RunnableFuture<T>) callable;
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return (RunnableFuture<T>) runnable;
        }
    }

    static class ComparableFutureTask<T> extends FutureTask<T> implements Comparable<ComparableFutureTask<T>> {

        volatile int priority = 0;

        public ComparableFutureTask(Runnable runnable, T result, int priority) {
            super(runnable, result);
            this.priority = priority;
        }

        public ComparableFutureTask(Callable<T> callable, int priority) {
            super(callable);
            this.priority = priority;
        }

        @Override
        public int compareTo(ComparableFutureTask<T> o) {
            return Integer.valueOf(priority).compareTo(o.priority);
        }
    }
}
assylias
  • 321,522
  • 82
  • 660
  • 783
  • Can you explain please, why the linked implementation throws this exception, if we don't override both 'newTaskFor' methods? – Dmitry Sobetsky May 31 '13 at 07:51
  • 1
    Because the default implementation wraps your ComparableFutureTask into a RunnableFuture and when your PriorityQueue tries to cast the RunnableFuture back to the ComparableFutureTask, it gets an exception. The JDK comes with the source code so you can debug this step by step in your IDE to see it happening in live. – assylias May 31 '13 at 09:41
  • I spent two hours analyzing this today... If you are 100% sure you will not call `ThreadPoolExecutor#submit` (only `ThreadPoolExecutor#execute`) you can actually skip the whole `PriorityExecutor`. `ThreadPoolExecutor#execute` will not wrap the `Runnable` parameter in anything, which will make it comparable in the `PriorityBlockingQueue`. Depending on the scope of which your `ThreadPoolExecutor` will be used, this could be a viable solution. I'd recommend adding a comment about this, though... Last, but not least, the `AbstractExecutorService` is terribly designed for this! – Ztyx Apr 25 '15 at 17:13
  • I tried this code, It always runs the first task without considering the priority. For example if you submit the task with priority 4 at first, it will always run that task at the beginning. Also if you increase the thread pool size to 2, even though there are enough threads waiting, it will run them based on the insertion order. – Arash Nov 24 '15 at 05:17
  • @Arash That is expected: when the first task is submitted, the task queue is empty and that task is executed immediately because it is the only task known to the pool. – assylias Nov 24 '15 at 09:43
  • If you use same priority, the initial order of submitions not taken into account! – Daniel Hári Mar 15 '17 at 11:15
  • 1
    I made an improvement to preserve task order for same priorities: http://stackoverflow.com/a/42831172/1386911 – Daniel Hári Mar 16 '17 at 10:23
1

You have 16 threads and only 5 tasks, meaning all of them are being executed concurrently and the priority is actually irrelevant.

The priority only matters when there are tasks waiting to be executed.

To show this, if you set your example to only use 1 thread, you will get your expected output.

Supericy
  • 5,866
  • 1
  • 21
  • 25