11

I am trying to build an implementation of the ExecutorService, let's call it SequentialPooledExecutor, with the following properties.

  1. All instances of SequentialPooledExecutor share the same thread pool

  2. Calls on the same instance of SequentialPooledExecutor are executed sequentially.

In other words, the instance waits for the termination of the currently executing task before start processing the next task in its queue.

I am currently in the process of implementing the SequentialPooledExecutor myself, but I am wondering if I am reinventing the wheel. I looked into different implementations of ExecutorService, for example those that are provided by the Executors class, but I did not find one that meets my requirements.

Do you know whether there are existing implementations that I am missing, or should I continue with implementing interface myself?

EDIT:

I think my requirement are not very clear, let's see if I can explain it with other words.

Suppose I have a series of sessions, say 1000 of them (the things that I was calling instance of the executor before). I can submit tasks to a session and I want the guarantee that all tasks that are submitted to the same session are executed sequentially. However, tasks that belong to different sessions should have no dependency from each other.

I want to define an ExecutorService that executes these tasks, but uses a bounded number of threads, let's say 200, but ensures that a task is not started before the previous one in the same session is finished.

I don't know if there is anything existing that already does that, or if I should implement such an ExecutorService myself.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
  • 1
    Why would need a thread pool ? If you are going to invoke them sequentially, you only need one. What am I missing? – Erik Oct 07 '16 at 08:23
  • @Erik see the comment below –  Oct 07 '16 at 08:33
  • You'll definitely need some custom implementation. But your problem is quite interesting, you might write another question with the details of your design and any problems you have. – Marko Topolnik Oct 07 '16 at 08:47
  • Nowadays there's [`MoreExecutors.newSequentialExecutor()`](http://google.github.io/guava/releases/snapshot/api/docs/com/google/common/util/concurrent/MoreExecutors.html#newSequentialExecutor-java.util.concurrent.Executor-). – Petr Janeček Nov 23 '17 at 13:51

6 Answers6

5

If you have thousands of keys which must be processed sequentially, but you don't have thousands of cores you can use a hashing strategy to distribute the work like this

ExecutorService[] es = // many single threaded executors

public <T> Future<T> submit(String key, Callable<T> calls) {
    int h = Math.abs(key.hashCode() % es.length);
    return es[h].submit(calls);
}

In general you only need 2 * N threads to keep N cores busy, if your task is CPU bound, more than that just adds overhead.

Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
3

If you want to execute your task sequentially simply create a ExecutorService with only one thread thanks to Executors.newSingleThreadExecutor().

If you have different type of tasks and you want to execute only the tasks of the same type sequentially, you can use the same single threaded ExecutorService for the same type of tasks, no need to reinvent the wheel.

So let's say that you have 1 000 different type of tasks, you could use 200 single threaded ExecutorService, the only thing that you need to implement yourself is the fact that you always need to use the same single threaded ExecutorService for a given type of task.

Nicolas Filotto
  • 43,537
  • 11
  • 94
  • 122
  • 1
    The problem with newSingleThreadedExecutor is that I cannot bound the number of threads that are created. If I create 1000 instances of the executor I will have 1000 threads executing in the background. What I would like instead is to have 1000 instances of the executor, but only say 200 threads executing the tasks. I however want the guarantee that tasks submitted on the same instance of the executor are executed sequentially. –  Oct 07 '16 at 08:28
  • 1
    if you want them to execute the tasks sequentially no need to have 200 threads, it would be a waste of resources. Use several threads only to launch tasks in parallel – Nicolas Filotto Oct 07 '16 at 08:30
  • I don't want to execute *all* tasks sequentially. Only those tasks that belong to the same instance of the executor should be executed sequentially. If two tasks belong to different instance of the executor then they can be executed in parallel. –  Oct 07 '16 at 08:32
  • So what you want is a number of single-threaded executors, each managinig its own thread? – JimmyB Oct 07 '16 at 08:34
  • Don't make it more complicated than it should be, if you want 200 threads create 200 instances no need to reinvent the wheel – Nicolas Filotto Oct 07 '16 at 08:37
  • The problem with using always the same 'ExecutorService' for a given type of task is that if that executor becomes unresponsive then all those tasks assigned to it are stuck as well (see also the answer and comments below). –  Oct 07 '16 at 11:54
  • 1
    sorry but for me this is completely another problem, even if you decide to implement your `SequentialPooledExecutor` you will have the same issue to manage even with as many threads as you like because you want to execute tasks sequentially so even if a task is too long, you will need to wait until it ends to execute another tasks. – Nicolas Filotto Oct 07 '16 at 12:02
1
private Map<Integer, CompletableFuture<Void>> sessionTasks = new HashMap<>();
private ExecutorService pool = Executors.newFixedThreadPool(200);

public void submit(int sessionId, Runnable task) {  
    if (sessionTasks.containsKey(sessionId)) {
        sessionTasks.compute(sessionId, (i, c) -> c.thenRunAsync(task, pool));
    } else {
        sessionTasks.put(sessionId, CompletableFuture.runAsync(task, pool));
    }
}

If a session has no task, a new task is created and run in the provided pool. If a session already has a tasks when a new task is added, the latter is chained (with thenRun) to the previous one, ensuring order.

Spotted
  • 4,021
  • 17
  • 33
  • 1
    The non concurrent map implementation may lead to unpredictable behavior if sessions register from multiple threads (which surely is the intended use case...). – GPI Oct 11 '16 at 21:26
0

@Nicolas's answer is probably your best bet as it is simple, well tested, and efficient.

If however it can not meet your requirement, I would do it like so :

  1. Do not make "SequentialPooledExecutor" an executor service, make it a facade for a "pool" of single thread executor services
  2. Make your "SequentialPooledExecutor" implement a submit method (takes a Runnable / Callable, and a String that represents a "queue name"), returns a Future, like an executor service
  3. On call of this method, make your "SequentialPooledExecutor" dispatch to one of its internal, single thread, executor service, by taking the hash of the queue name, and dispatching it to the corresponding internal executor.

The hashing part that takes place at step 3 allows you to have the tasks for each "queue name" always go to the same (single thread) executor service inside of your "SequentialPooledExecutor".

Another possible route is the use of CompletionStage and CompletableFutures. These are, in effect, listenable futures (that have a completion handler). With these, the first time you have a "session", you create a CompletableFuture with your first task, and hold on to it. At each new task, you combine the previous future with the new task, calling thenAcceptAsync (or any of the like). What you get is a linear chain of execution tasks.

GPI
  • 9,088
  • 2
  • 31
  • 38
  • The problem with your suggestion is that if I bind a session to an executor deterministically, for example according to its hash, and that session becomes unresponsive or very slow, then all sessions that are bound to the same executor are blocked. –  Oct 07 '16 at 08:56
  • 1
    There is no solution for that : if you do not want a thread by session, then sessions will eventually be able to lock other sessions. You have independance, or you don't... I guess. You could try some sort of "validation query" (just as a database connection pool does) before inserting tasks, and dropping executors as they fail, but doing that, maintaining the sequential execution requirement, and not dropping tasks will prove tricky to say the least. – GPI Oct 07 '16 at 09:00
  • My idea would be that a session gets a new executor from the pool if the pool is not full and waits if the pool is full. In this way sessions are locked only if all the executors in the pool are slow or unresponsive. This scenario would not be likely, though. –  Oct 07 '16 at 09:08
  • The waiting you suggest at this level is conceptually no different than the one you did not want in your first comment. – GPI Oct 07 '16 at 10:31
  • Not exactly. A session that waits for *any* executor to become available is not the same as a session waiting for *the* statically assigned executor to become available. –  Oct 07 '16 at 11:40
  • 1
    Well you do not want *any* pool because you want a sequential execution... But I get what you mean. I've edited with another potential solution, using completable futures. Be careful though that no matter what you do, if your tasks can become slow or unresponsive, then you will end up in a bad situation. You should design with that in mind. No matter if you have 200 or 2000 executors, saturations if they occur *will* pile up and fill it all. – GPI Oct 07 '16 at 11:53
  • I thinks I know how to implement a solution, but your idea of using completable futures is interesting, though. My original question, though, was if there is already an implementation for such an executor provided by Java :) –  Oct 07 '16 at 11:58
  • If you are willing to go down the completable future route, abandonning your "thread per session model", then the answer to that question is : any JDK shipped ExecutorService is capable of that. – GPI Oct 07 '16 at 12:05
0

If you want to configure bounded queue, use ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)

For your use case, use ThreadPoolExecutor as

ThreadPoolExecutor executor =    
ThreadPoolExecutor(1,1,60,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(1000));

Above code caps size of queue is ThreadPoolExecutor as 1000. If you want to use custom rejected execution handler, you can configure RejectedExeutionHandler.

Related SE question:

How to properly use Java Executor?

Community
  • 1
  • 1
Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
0

Recently I encountered the same problem. There is no built-in class for this, but a queue is close enough. My simple implementation looks like this (maybe it's helpful for others looking for examples on the same issue)

public class SerializedAsyncRunnerSimple implements Runnable {
private final ExecutorService pool;
protected final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); //thread safe queue
protected final AtomicBoolean active = new AtomicBoolean(false);


public SerializedAsyncRunnerSimple(ExecutorService threadPool) {this.pool = threadPool;}


public void addWork(Runnable r){        
    queue.add(r);
    startExecutionIfInactive();
}

private void startExecutionIfInactive() {
    if(active.compareAndSet(false, true)) {
        pool.execute(this);
    }
}

@Override
public synchronized void run() {
    while(!queue.isEmpty()){
        queue.poll().run();
    }
    active.set(false); //all future adds will not be executed on this thread anymore
    if(!queue.isEmpty()) { //if some items were added to the queue after the last queue.poll
        startExecutionIfInactive();// trigger an execution on next thread
    }
}
George
  • 1,027
  • 3
  • 12
  • 20