I developed a piece of code which is multi-threaded. This code is called within a web application, so potentially by several Threads (requests) in parallel. To keep under control the number of Threads this code is going to create (by being called by several parallel requests), I use a static shared ThreadPoolExecutor (Executors.newFixedThreadPool(nbOfThreads)
). So I am sure that this code is never going to create more than nbOfThreads
threads. To follow the tasks involved in a given request and wait that they are finished, I use a CompletionService for each request.
Now, I would like to have a bit of “fairness” (not sure it’s the good word) in the way threads of the pool are given to requests.
With the default fixed ThreadPoolExecutor, the waiting queue is a LinkedBlockingQueue
. It gives tasks to the Executor according to their arriving order (FIFO). Imagine that the pool core size is 100 threads. A 1st request is big and involves the creation of 150 tasks. So it is going to make the pool full and put 50 tasks in the waiting queue. If a 2nd tiny request arrives 1 second later, even if it needs only 2 threads from the pool, it will have to wait that all the 150 tasks created by the first big request are finished before being processed.
How to make the pool fairly and evenly giving threads to each request ? How to make the 2 tasks of the 2nd request not waiting after all the 50 waiting tasks of the 1st query ?
My idea was to develop a personal implementation of BlockingQueue to give to the ThreadPoolExecutor. This BlockingQueue would store the waiting tasks classified by the requests which created them (in a backing Map with the id of the request in key and a LinkedBlockingQueue storing the tasks of the request in value). Then when the ThreadPoolExecutor take
or poll
a new task from the queue, the queue will give a task from a different request each time … Is it the correct approach ? The use-case seems quite common to me. I am surprised that I have to implement such custom and tedious stuff myself. That’s why I am thinking that I may be wrong and there exists a well-know best practice to do that.
Here is the code I did. It works but still wondering if this is the right approach.
public class TestThreadPoolExecutorWithTurningQueue {
private final static Logger logger = LogManager.getLogger();
private static ThreadPoolExecutor executorService;
int nbRequest = 4;
int nbThreadPerRequest = 8;
int threadPoolSize = 5;
private void init() {
executorService = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 0L, TimeUnit.MILLISECONDS,
new CategoryBlockingQueue<Runnable>()// my custom blocking queue storing waiting tasks per request
//new LinkedBlockingQueue<Runnable>()
);
}
@Test
public void test() throws Exception {
init();
// Parallel requests arriving
ExecutorService tomcat = Executors.newFixedThreadPool(nbRequest);
for (int i = 0; i < nbRequest; i++) {
Thread.sleep(10);
final int finalI = i;
tomcat.execute(new Runnable() {
@Override
public void run() {
request(finalI);
}
});
}
tomcat.shutdown();
tomcat.awaitTermination(1, TimeUnit.DAYS);
}
// Code executed by each request
// Code multi-threaded using a single shared ThreadPoolExecutor to keep the
// number of threads under control
public void request(final int requestId) {
final List<Future<Object>> futures = new ArrayList<>();
CustomCompletionService<Object> completionService = new CustomCompletionService<>(executorService);
for (int j = 0; j < nbThreadPerRequest; j++) {
final int finalJ = j;
futures.add(completionService.submit(new CategoryRunnable(requestId) {
@Override
public void run() {
logger.debug("thread " + finalJ + " of request " + requestId);
try {
// here should come the useful things to be done
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, null));
}
// Wait fot completion of all the tasks of the request
// If a task threw an exception, cancel the other tasks of the request
for (int j = 0; j < nbThreadPerRequest; j++) {
try {
completionService.take().get();
} catch (Exception e) {
// Cancel the remaining tasks
for (Future<Object> future : futures) {
future.cancel(true);
}
// Get the underlying exception
Exception toThrow = e;
if (e instanceof ExecutionException) {
ExecutionException ex = (ExecutionException) e;
toThrow = (Exception) ex.getCause();
}
throw new RuntimeException(toThrow);
}
}
}
public class CustomCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final BlockingQueue<Future<V>> completionQueue;
public CustomCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
return new FutureTask<V>(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
return new FutureTask<V>(task, result);
}
public Future<V> submit(CategoryCallable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new CategorizedQueueingFuture(f, task.getCategory()));
return f;
}
public Future<V> submit(CategoryRunnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new CategorizedQueueingFuture(f, task.getCategory()));
return f;
}
public Future<V> submit(CategoryRunnable task) {
return submit(task, null);
}
@Override
public Future<V> submit(Callable<V> task) {
throw new IllegalArgumentException("Must use a 'CategoryCallable'");
}
@Override
public Future<V> submit(Runnable task, V result) {
throw new IllegalArgumentException("Must use a 'CategoryRunnable'");
}
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
/**
* FutureTask extension to enqueue upon completion + Category
*/
public class CategorizedQueueingFuture extends FutureTask<Void> {
private final Future<V> task;
private int category;
CategorizedQueueingFuture(RunnableFuture<V> task, int category) {
super(task, null);
this.task = task;
this.category = category;
}
protected void done() {
completionQueue.add(task);
}
public int getCategory() {
return category;
}
}
}
public abstract class CategoryRunnable implements Runnable {
private int category;
public CategoryRunnable(int category) {
this.category = category;
}
public int getCategory() {
return category;
}
}
public abstract class CategoryCallable<V> implements Callable<V> {
private int category;
public CategoryCallable(int category) {
this.category = category;
}
public int getCategory() {
return category;
}
}
public class CategoryBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
private Map<Integer, LinkedBlockingQueue<E>> map = new HashMap<>();
private AtomicInteger count = new AtomicInteger(0);
private ReentrantLock lock = new ReentrantLock();
private LinkedBlockingQueue<Integer> nextCategories = new LinkedBlockingQueue<>();
@Override
public boolean offer(E e) {
CustomCompletionService.CategorizedQueueingFuture item = (CustomCompletionService.CategorizedQueueingFuture) e;
lock.lock();
try {
int category = item.getCategory();
if (!map.containsKey(category)) {
map.put(category, new LinkedBlockingQueue<E>());
nextCategories.offer(category);
}
boolean b = map.get(category).offer(e);
if (b) {
count.incrementAndGet();
}
return b;
} finally {
lock.unlock();
}
}
@Override
public E poll() {
return null;
}
@Override
public E peek() {
return null;
}
@Override
public void put(E e) throws InterruptedException {
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
Integer nextCategory = nextCategories.take();
LinkedBlockingQueue<E> categoryElements = map.get(nextCategory);
E e = categoryElements.take();
count.decrementAndGet();
if (categoryElements.isEmpty()) {
map.remove(nextCategory);
} else {
nextCategories.offer(nextCategory);
}
return e;
} finally {
lock.unlock();
}
}
@Override
public boolean remove(Object o) {
CustomCompletionService.CategorizedQueueingFuture item = (CustomCompletionService.CategorizedQueueingFuture) o;
lock.lock();
try {
int category = item.getCategory();
LinkedBlockingQueue<E> categoryElements = map.get(category);
boolean b = categoryElements.remove(item);
if (categoryElements.isEmpty()) {
map.remove(category);
}
if (b) {
count.decrementAndGet();
}
return b;
} finally {
lock.unlock();
}
}
@Override
public int drainTo(Collection<? super E> c) {
return 0;
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return 0;
}
@Override
public Iterator<E> iterator() {
return null;
}
@Override
public int size() {
return count.get();
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// TODO
return null;
}
@Override
public int remainingCapacity() {
return 0;
}
}
}
Output with the traditional LinkedBlockingQueue
2017-01-09 14:56:13,061 [pool-2-thread-1] DEBUG - thread 0 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-4] DEBUG - thread 3 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-2] DEBUG - thread 1 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-3] DEBUG - thread 2 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-5] DEBUG - thread 4 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-2] DEBUG - thread 5 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-1] DEBUG - thread 6 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-4] DEBUG - thread 7 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-3] DEBUG - thread 0 of request 1
2017-01-09 14:56:15,063 [pool-2-thread-5] DEBUG - thread 1 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-2] DEBUG - thread 2 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-4] DEBUG - thread 3 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-1] DEBUG - thread 5 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-3] DEBUG - thread 4 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-5] DEBUG - thread 6 of request 1
2017-01-09 14:56:19,064 [pool-2-thread-4] DEBUG - thread 7 of request 1
2017-01-09 14:56:19,064 [pool-2-thread-1] DEBUG - thread 0 of request 2
2017-01-09 14:56:19,064 [pool-2-thread-3] DEBUG - thread 1 of request 2
2017-01-09 14:56:19,064 [pool-2-thread-5] DEBUG - thread 2 of request 2
2017-01-09 14:56:19,064 [pool-2-thread-2] DEBUG - thread 3 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-4] DEBUG - thread 4 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-3] DEBUG - thread 5 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-5] DEBUG - thread 6 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-2] DEBUG - thread 7 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-1] DEBUG - thread 0 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-4] DEBUG - thread 2 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-3] DEBUG - thread 1 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-2] DEBUG - thread 3 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-1] DEBUG - thread 4 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-5] DEBUG - thread 5 of request 3
2017-01-09 14:56:25,064 [pool-2-thread-2] DEBUG - thread 7 of request 3
2017-01-09 14:56:25,064 [pool-2-thread-1] DEBUG - thread 6 of request 3
Output with my custom CategoryBlockingQueue
2017-01-09 14:54:54,765 [pool-2-thread-3] DEBUG - thread 2 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-2] DEBUG - thread 1 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-5] DEBUG - thread 4 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-1] DEBUG - thread 0 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-4] DEBUG - thread 3 of request 0
2017-01-09 14:54:56,767 [pool-2-thread-1] DEBUG - thread 0 of request 1
2017-01-09 14:54:56,767 [pool-2-thread-4] DEBUG - thread 0 of request 3
2017-01-09 14:54:56,767 [pool-2-thread-3] DEBUG - thread 5 of request 0
2017-01-09 14:54:56,767 [pool-2-thread-5] DEBUG - thread 0 of request 2
2017-01-09 14:54:56,767 [pool-2-thread-2] DEBUG - thread 6 of request 0
2017-01-09 14:54:58,767 [pool-2-thread-1] DEBUG - thread 1 of request 1
2017-01-09 14:54:58,767 [pool-2-thread-5] DEBUG - thread 1 of request 2
2017-01-09 14:54:58,767 [pool-2-thread-2] DEBUG - thread 7 of request 0
2017-01-09 14:54:58,767 [pool-2-thread-4] DEBUG - thread 1 of request 3
2017-01-09 14:54:58,767 [pool-2-thread-3] DEBUG - thread 2 of request 1
2017-01-09 14:55:00,767 [pool-2-thread-1] DEBUG - thread 2 of request 2
2017-01-09 14:55:00,767 [pool-2-thread-5] DEBUG - thread 2 of request 3
2017-01-09 14:55:00,767 [pool-2-thread-2] DEBUG - thread 3 of request 1
2017-01-09 14:55:00,767 [pool-2-thread-4] DEBUG - thread 3 of request 2
2017-01-09 14:55:00,767 [pool-2-thread-3] DEBUG - thread 3 of request 3
2017-01-09 14:55:02,767 [pool-2-thread-5] DEBUG - thread 4 of request 1
2017-01-09 14:55:02,767 [pool-2-thread-3] DEBUG - thread 4 of request 2
2017-01-09 14:55:02,767 [pool-2-thread-2] DEBUG - thread 4 of request 3
2017-01-09 14:55:02,767 [pool-2-thread-1] DEBUG - thread 5 of request 1
2017-01-09 14:55:02,767 [pool-2-thread-4] DEBUG - thread 5 of request 2
2017-01-09 14:55:04,767 [pool-2-thread-2] DEBUG - thread 5 of request 3
2017-01-09 14:55:04,767 [pool-2-thread-1] DEBUG - thread 6 of request 1
2017-01-09 14:55:04,767 [pool-2-thread-5] DEBUG - thread 6 of request 2
2017-01-09 14:55:04,767 [pool-2-thread-3] DEBUG - thread 6 of request 3
2017-01-09 14:55:04,768 [pool-2-thread-4] DEBUG - thread 7 of request 1
2017-01-09 14:55:06,768 [pool-2-thread-2] DEBUG - thread 7 of request 3
2017-01-09 14:55:06,768 [pool-2-thread-1] DEBUG - thread 7 of request 2