8

Is there a Java class such that:

  1. Executable tasks can be added via an id, where all tasks with the same id are guaranteed to never run concurrently
  2. The number of threads can be limited to a fixed amount

A naive solution of a Map would easily solve (1), but it would be difficult to manage (2). Similarly, all thread pooling classes that I know of will pull from a single queue, meaning (1) is not guaranteed.

Solutions involving external libraries are welcome.

assylias
  • 321,522
  • 82
  • 660
  • 783
00500005
  • 3,727
  • 3
  • 31
  • 38
  • @Makky, I had been specifically looking at Spring TaskExecutor – 00500005 Jul 31 '13 at 14:22
  • 3
    @downvoters, why is this a bad question? This is not a "plz give me teh code" question, and seems to me that it would be a fairly common use case for multi-threading, and googling doesn't yield any obvious results – 00500005 Jul 31 '13 at 14:24

4 Answers4

4

For each id, you need a SerialExecutor, described in the documentation of java.util.concurrent.Executor. All serial executors delegate work to a ThreadPoolExecutor with given corePoolSize.

Opimized version of SerialExecutor can be found at my code samples.

Alexei Kaigorodov
  • 13,189
  • 1
  • 21
  • 38
  • This is a hidden gem in the Java documentation! SerialExecutor effectively implements a multiqueue thread pool, but they don't really make that explicit. – blueimpb Aug 24 '16 at 17:31
  • 1
    @Alexei Kaigorodov, after you've re-arranged your github repo, some of your links in your older answers end up with 404 - I've run into 2 different cases since yesterday. I'm guessing the one referred here is [this file](https://github.com/akaigoro/CodeSamples/blob/master/src/main/java/actor/serialexec/SerialExecutor.java), but please try to fix them when you have the chance. Thanks! – OzgurH May 24 '19 at 11:28
3

If you don't find something that does this out of the box, it shouldn't be hard to roll your own. One thing you could do is to wrap each task in a simple class that reads on a queue unique per id, e.g.:

public static class SerialCaller<T> implements Callable<T> {
    private final BlockingQueue<Caller<T>> delegates;

    public SerialCaller(BLockingQueue<Caller<T>> delegates) {
        this.delegates = delegates;
    }

    public T call() throws Exception {
        return delegates.take().call();
    }
}

It should be easy to maintain a map of ids to queues for submitting tasks. That satisfies condition (1), and then you can look for simple solutions to condition (2), such as Executors. newFixedThreadPool

yshavit
  • 42,327
  • 7
  • 87
  • 124
  • Your initial proposal seemed lighter and easier to implement - any reason for the change? – assylias Jul 31 '13 at 14:38
  • 1
    @assylias I realized it could be a big bottleneck. Let's say your thread pool is also limited to N threads, and you've got N tasks under id A, as well as some number under id B and C. By an unfortunate coincidence, all of A's tasks get scheduled. That means N-1 threads are blocked -- and not executing tasks for B or C. – yshavit Jul 31 '13 at 14:57
  • could you please complete this answer by adding source code about maping ids to queues and submit and polling tasks ? – Mojtaba Yeganeh Jan 22 '17 at 12:10
2

I think that the simplest solution is to just have a separate queue for each index and a separate executor (with one thread) for each queue.

The only thing you could achieve with a more complex solution would be to use fewer threads, but if the number of indexes is small and bounded that's probably not worth the effort.

Stephen C
  • 698,415
  • 94
  • 811
  • 1,216
1

Yes, there is such a library now: https://github.com/jano7/executor

int maxTasks = 10;
ExecutorService underlyingExecutor = Executors.newFixedThreadPool(maxTasks);
KeySequentialBoundedExecutor executor = new KeySequentialBoundedExecutor(maxTasks, underlyingExecutor);

Runnable task = new Runnable() {
    @Override
    public void run() {
        // do something
    }
};

executor.execute(new KeyRunnable<>("ID-1", task)); // execute the task by the underlying executor
executor.execute(new KeyRunnable<>("ID-2", task)); // execution is not blocked by the task for ID-1
executor.execute(new KeyRunnable<>("ID-1", task)); // execution starts when the previous task for ID-1 completes
Estok
  • 21
  • 2