I recently answered a question about a "serial task queue" with a basic implementation as demonstration here. I imagine you have been using a similar solution. It is relatively easy to adapt the implementation to use a map of task lists and still share one (fixed size) executor.
The Striped Executor Service you mention is the better solution, but I show the adapted implementation here to demonstrate decoupling the task queue(s) from the executor. The implementation uses a callback and therefor has no need to do polling or signalling. Since a "critical (stop the world) section" is used, the map with task queues can clean itself: no tasks queued means empty map. Downside of the "critical section" is that throughput is limited: only so many tasks can be added and removed per second.
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
// Copied and updated from https://stackoverflow.com/a/32916943/3080094
public class SerialTaskQueues {
public static void main(String[] args) {
// test the serial task execution using different groups
ExecutorService executor = Executors.newFixedThreadPool(2);
SerialTaskQueues tq = new SerialTaskQueues(executor);
try {
// test running the tasks one by one
tq.add(new SleepSome("1", 30L));
Thread.sleep(5L);
tq.add(new SleepSome("2", 20L));
tq.add(new SleepSome("1", 10L));
Thread.sleep(100L);
// all queues should be empty
System.out.println("Queue size 1: " + tq.size("1")); // should be empty
System.out.println("Queue size 2: " + tq.size("2")); // should be empty
tq.add(new SleepSome("1", 10L));
tq.add(new SleepSome("2", 20L));
// with executor pool size set to 2, task 3 will have to wait for task 1 to complete
tq.add(new SleepSome("3", 30L));
tq.add(new SleepSome("1", 20L));
tq.add(new SleepSome("2", 10L));
Thread.sleep(100L);
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdownNow();
}
}
// all lookups and modifications to the list must be synchronized on the list.
private final Map<String, GroupTasks> taskGroups = new HashMap<>();
// make lock fair so that adding and removing tasks is balanced.
private final ReentrantLock lock = new ReentrantLock(true);
private final ExecutorService executor;
public SerialTaskQueues(ExecutorService executor) {
this.executor = executor;
}
public boolean add(String groupId, Runnable task) {
lock.lock();
try {
GroupTasks gt = taskGroups.get(groupId);
if (gt == null) {
gt = new GroupTasks(groupId);
taskGroups.put(groupId, gt);
}
gt.tasks.add(task);
} finally {
lock.unlock();
}
runNextTask(groupId);
return true;
}
/* Utility method for testing. */
public void add(SleepSome sleepTask) {
add(sleepTask.groupId, sleepTask);
}
private void runNextTask(String groupId) {
// critical section that ensures one task is executed.
lock.lock();
try {
GroupTasks gt = taskGroups.get(groupId);
if (gt.tasks.isEmpty()) {
// only cleanup when last task has executed, prevent memory leak
if (!gt.taskRunning.get()) {
taskGroups.remove(groupId);
}
} else if (!executor.isShutdown() && gt.taskRunning.compareAndSet(false, true)) {
executor.execute(wrapTask(groupId, gt.taskRunning, gt.tasks.remove(0)));
}
} finally {
lock.unlock();
}
}
private CallbackTask wrapTask(final String groupId, final AtomicBoolean taskRunning, Runnable task) {
return new CallbackTask(task, new Runnable() {
@Override
public void run() {
if (!taskRunning.compareAndSet(true, false)) {
System.out.println("ERROR: programming error, the callback should always run in execute state.");
}
runNextTask(groupId);
}
});
}
/** Amount of (active) task groups. */
public int size() {
int size = 0;
lock.lock();
try {
size = taskGroups.size();
} finally {
lock.unlock();
}
return size;
}
public int size(String groupId) {
int size = 0;
lock.lock();
try {
GroupTasks gt = taskGroups.get(groupId);
size = (gt == null ? 0 : gt.tasks.size());
} finally {
lock.unlock();
}
return size;
}
public Runnable get(String groupId, int index) {
Runnable r = null;
lock.lock();
try {
GroupTasks gt = taskGroups.get(groupId);
r = (gt == null ? null : gt.tasks.get(index));
} finally {
lock.unlock();
}
return r;
}
public Runnable remove(String groupId, int index) {
Runnable r = null;
lock.lock();
try {
GroupTasks gt = taskGroups.get(groupId);
r = gt.tasks.remove(index);
// similar to runNextTask - cleanup if there are no tasks (running) for the group
if (gt.tasks.isEmpty() && !gt.taskRunning.get()) {
taskGroups.remove(groupId);
}
} finally {
lock.unlock();
}
return r;
}
/* Helper class for the task-group map. */
class GroupTasks {
final List<Runnable> tasks = new LinkedList<Runnable>();
// atomic boolean used to ensure only 1 task is executed at any given time
final AtomicBoolean taskRunning = new AtomicBoolean(false);
final String groupId;
GroupTasks(String groupId) {
this.groupId = groupId;
}
}
// general callback-task, see https://stackoverflow.com/a/826283/3080094
static class CallbackTask implements Runnable {
private final Runnable task, callback;
public CallbackTask(Runnable task, Runnable callback) {
this.task = task;
this.callback = callback;
}
@Override
public void run() {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
callback.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// task that just sleeps for a while
static class SleepSome implements Runnable {
static long startTime = System.currentTimeMillis();
private final String groupId;
private final long sleepTimeMs;
public SleepSome(String groupId, long sleepTimeMs) {
this.groupId = groupId;
this.sleepTimeMs = sleepTimeMs;
}
@Override public void run() {
try {
System.out.println(tdelta(groupId) + "Sleeping for " + sleepTimeMs + " ms.");
Thread.sleep(sleepTimeMs);
System.out.println(tdelta(groupId) + "Slept for " + sleepTimeMs + " ms.");
} catch (Exception e) {
e.printStackTrace();
}
}
private String tdelta(String groupId) { return String.format("% 4d [%s] ", (System.currentTimeMillis() - startTime), groupId); }
}
}