27

Among all my tasks, I have some that must be processed serially (they can never run concurrently and they must be processed in order).

I achieved that creating a separated thread pool with a single thread for each group of tasks that must be executed serially. It works but I don't have the resources for that. I don't control the number of groups, so I might end up with a ridiculous number of threads running simultaneously.

Is there any way I can accomplish that with a single thread pool? Is there a thread pool with multiple blocking queues where I could ensure serial execution for each queue?

EDIT:

Just emphasizing what I've said in my second paragraph: I've solved this with a single threaded thread pool for each group of tasks that must be executed serially. I can't go on with this solution, though. There are way too many groups and I can't have all these threads.

I've found this related question, but since it is not very recent, I still created mine. All I'm doing is trying to avoid reinventing the wheel, but it seems I don't have a choice.

Does Java have an indexable multi-queue thread pool?

Community
  • 1
  • 1
Fred Porciúncula
  • 8,533
  • 3
  • 40
  • 57

8 Answers8

5

If you maintain a queue for each group, you can pull items off each queue and feed them into a thread pool. The code below won't prioritize any one group, it just pulls them in a round-robin fashion. If you need to add prioritization you should easily be able to. The following code will round-robin 4 groups using two threads (plus the thread managing the queue). You can use another queue mechanism. I typically use LinkedBlockingQueue for situations where I want to wait for items to be placed on the queue by another thread, which probably is not what you want - so I'm polling instead of calling take(). Take is the call that waits.

private Future group1Future = null;
private Future group2Future = null;
private Future group3Future = null;
private Future group4Future = null;
private LinkedBlockingQueue<Callable> group1Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group2Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group3Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group4Queue
        = new LinkedBlockingQueue<>();

private ExecutorService executor = Executors.newFixedThreadPool(2);


public void startProcessing() {
    while (true) {
        if (group1Future != null && group1Future.isDone()) {
            if (group1Queue.peek() != null) {
                group1Future = executor.submit(group1Queue.poll());
            }
        }
        if (group2Future != null && group1Future.isDone()) {
            if (group2Queue.peek() != null) {
                group2Future = executor.submit(group2Queue.poll());
            }
        }
        if (group3Future != null && group3Future.isDone()) {
            if (group3Queue.peek() != null) {
                group3Future = executor.submit(group3Queue.poll());
            }
        }

        if (group4Future != null && group4Future.isDone()) {
            if (group4Queue.peek() != null) {
                group4Future = executor.submit(group4Queue.poll());
            }
        }
    }
}

If a task for that group is not complete, it will skip to the next group. No more than two groups will be processed at a time and no single group will ever run more than one task. The queues will enforce ordered execution.

Dodd10x
  • 3,344
  • 1
  • 18
  • 27
4

Akka, as suggested by @SotiriosDelimanolis and @AlexeiKaigorodov seems promising, as well as @Dodd10x second answer, which certainly solves the problem. The only downside is that I'd have to code my own polling strategy to make sure my tasks are eventually added to the executor (like the infinite loop in his example).

On the other hand, the Striped Executor Service suggested by @OldCurmudgeon exactly matches my problem and works out of the box simply as a custom ExecutorService.

This magical thread pool would ensure that all Runnables with the same stripeClass would be executed in the order they were submitted, but StripedRunners with different stripedClasses could still execute independently. He wanted to use a relatively small thread pool to service a large number of Java NIO clients, but in such a way that the runnables would still be executed in-order.

There is even a comment about using a single threaded thread pool for each group (stripe), as it was suggested here:

Several suggestions were made, such as having a SingleThreadExecutor for each stripeClass. However, that would not satisfy the requirement that we could share the threads between connections.

I see this as the best solution for its simplicity and ease of use.

Community
  • 1
  • 1
Fred Porciúncula
  • 8,533
  • 3
  • 40
  • 57
2

I recently answered a question about a "serial task queue" with a basic implementation as demonstration here. I imagine you have been using a similar solution. It is relatively easy to adapt the implementation to use a map of task lists and still share one (fixed size) executor.
The Striped Executor Service you mention is the better solution, but I show the adapted implementation here to demonstrate decoupling the task queue(s) from the executor. The implementation uses a callback and therefor has no need to do polling or signalling. Since a "critical (stop the world) section" is used, the map with task queues can clean itself: no tasks queued means empty map. Downside of the "critical section" is that throughput is limited: only so many tasks can be added and removed per second.

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

// Copied and updated from https://stackoverflow.com/a/32916943/3080094
public class SerialTaskQueues {

    public static void main(String[] args) {

        // test the serial task execution using different groups
        ExecutorService executor = Executors.newFixedThreadPool(2);
        SerialTaskQueues tq = new SerialTaskQueues(executor);
        try {
            // test running the tasks one by one
            tq.add(new SleepSome("1", 30L));
            Thread.sleep(5L);
            tq.add(new SleepSome("2", 20L));
            tq.add(new SleepSome("1", 10L));

            Thread.sleep(100L);
            // all queues should be empty
            System.out.println("Queue size 1: " + tq.size("1")); // should be empty
            System.out.println("Queue size 2: " + tq.size("2")); // should be empty
            tq.add(new SleepSome("1", 10L));
            tq.add(new SleepSome("2", 20L));
            // with executor pool size set to 2, task 3 will have to wait for task 1 to complete
            tq.add(new SleepSome("3", 30L));
            tq.add(new SleepSome("1", 20L));
            tq.add(new SleepSome("2", 10L));

            Thread.sleep(100L);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdownNow();
        }
    }

    // all lookups and modifications to the list must be synchronized on the list.
    private final Map<String, GroupTasks> taskGroups = new HashMap<>();
    // make lock fair so that adding and removing tasks is balanced.
    private final ReentrantLock lock = new ReentrantLock(true);
    private final ExecutorService executor;

    public SerialTaskQueues(ExecutorService executor) {
        this.executor = executor;
    }

    public boolean add(String groupId, Runnable task) {

        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            if (gt == null) {
                gt = new GroupTasks(groupId);
                taskGroups.put(groupId, gt);
            }
            gt.tasks.add(task); 
        } finally {
            lock.unlock();
        }
        runNextTask(groupId);
        return true;
    }

    /* Utility method for testing. */
    public void add(SleepSome sleepTask) {
        add(sleepTask.groupId, sleepTask);
    }

    private void runNextTask(String groupId) {

        // critical section that ensures one task is executed.
        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            if (gt.tasks.isEmpty()) {
                // only cleanup when last task has executed, prevent memory leak
                if (!gt.taskRunning.get()) {
                    taskGroups.remove(groupId);
                }
            } else if (!executor.isShutdown() && gt.taskRunning.compareAndSet(false, true)) {
                executor.execute(wrapTask(groupId, gt.taskRunning, gt.tasks.remove(0)));
            }
        } finally {
            lock.unlock();
        }
    }

    private CallbackTask wrapTask(final String groupId, final AtomicBoolean taskRunning, Runnable task) {

        return new CallbackTask(task, new Runnable() {
            @Override 
            public void run() {
                if (!taskRunning.compareAndSet(true, false)) {
                    System.out.println("ERROR: programming error, the callback should always run in execute state.");
                }
                runNextTask(groupId);
            }
        });
    }

    /** Amount of (active) task groups. */
    public int size() {

        int size = 0;
        lock.lock();
        try {
            size = taskGroups.size();
        } finally {
            lock.unlock();
        }
        return size;
    }

    public int size(String groupId) {

        int size = 0;
        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            size = (gt == null ? 0 : gt.tasks.size());
        } finally {
            lock.unlock();
        }
        return size;
    }

    public Runnable get(String groupId, int index) {

        Runnable r = null;
        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            r =  (gt == null ? null : gt.tasks.get(index));
        } finally {
            lock.unlock();
        }
        return r;
    }

    public Runnable remove(String groupId, int index) {

        Runnable r = null;
        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            r = gt.tasks.remove(index);
            // similar to runNextTask - cleanup if there are no tasks (running) for the group 
            if (gt.tasks.isEmpty() && !gt.taskRunning.get()) {
                taskGroups.remove(groupId);
            }
        } finally {
            lock.unlock();
        }
        return r;
    }

    /* Helper class for the task-group map. */
    class GroupTasks {

        final List<Runnable> tasks = new LinkedList<Runnable>();
        // atomic boolean used to ensure only 1 task is executed at any given time
        final AtomicBoolean taskRunning = new AtomicBoolean(false);
        final String groupId;

        GroupTasks(String groupId) {
            this.groupId = groupId;
        }
    }

    // general callback-task, see https://stackoverflow.com/a/826283/3080094
    static class CallbackTask implements Runnable {

        private final Runnable task, callback;

        public CallbackTask(Runnable task, Runnable callback) {
            this.task = task;
            this.callback = callback;
        }

        @Override 
        public void run() {

            try {
                task.run();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    callback.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // task that just sleeps for a while
    static class SleepSome implements Runnable {

        static long startTime = System.currentTimeMillis();

        private final String groupId;
        private final long sleepTimeMs;
        public SleepSome(String groupId, long sleepTimeMs) {
            this.groupId = groupId;
            this.sleepTimeMs = sleepTimeMs;
        }
        @Override public void run() {
            try { 
                System.out.println(tdelta(groupId) + "Sleeping for " + sleepTimeMs + " ms.");
                Thread.sleep(sleepTimeMs);
                System.out.println(tdelta(groupId) + "Slept for " + sleepTimeMs + " ms.");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private String tdelta(String groupId) { return String.format("% 4d [%s] ", (System.currentTimeMillis() - startTime), groupId); }
    }
}
Community
  • 1
  • 1
vanOekel
  • 6,358
  • 1
  • 21
  • 56
1

Look into Java's built-in thread executor service.

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html

There is a single thread executor that will process each task synchronously.

In response to the comments section:

Please read the API before you say this won't work.
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor() Creates an Executor that uses a single worker thread operating off an unbounded queue. (Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.) Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.

Note: is states they are guaranteed to execute sequentially.

EDIT:

Now that I understand your question better, I have an idea you could try. If you maintain a queue for each group, you can pull items off each queue and feed them into a thread pool. The code below won't prioritize any one group, it just pulls them in a round robbing fashion. If you need to add prioritization you should easily be able to. The following code will round robbing 4 groups using two threads (plus the thread managing the queue). You can use another queue mechanism. I typically use LinkedBlockingQueue for situations where I want to wait for items to be placed on the queue by another thread, which probably is not what you want - which is why I'm polling instead of calling take(). Take is the call that waits.

private Future group1Future = null;
private Future group2Future = null;
private Future group3Future = null;
private Future group4Future = null;
private LinkedBlockingQueue<Callable> group1Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group2Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group3Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group4Queue
        = new LinkedBlockingQueue<>();

private ExecutorService executor = Executors.newFixedThreadPool(2);


public void startProcessing() {
    while (true) {
        if (group1Future != null && group1Future.isDone()) {
            if (group1Queue.peek() != null) {
                group1Future = executor.submit(group1Queue.poll());
            }
        }
        if (group2Future != null && group1Future.isDone()) {
            if (group2Queue.peek() != null) {
                group2Future = executor.submit(group2Queue.poll());
            }
        }
        if (group3Future != null && group3Future.isDone()) {
            if (group3Queue.peek() != null) {
                group3Future = executor.submit(group3Queue.poll());
            }
        }

        if (group4Future != null && group4Future.isDone()) {
            if (group4Queue.peek() != null) {
                group4Future = executor.submit(group4Queue.poll());
            }
        }
    }
}

If a task for that group is not complete, it will skip to the next group. No more than two groups will be processed at a time and no single group will ever run more than one task. The queues will enforce ordered execution.

Dodd10x
  • 3,344
  • 1
  • 18
  • 27
  • I think it is wrong, read following from documentation - "*An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more **asynchronous tasks**.*" .. If he will push his tasks using `ExecutorService` then there is guarantee that tasks will be executed synchronoulsy .. He has quoted - "*they can never run concurrently and **they must be processed in order***" – hagrawal7777 Oct 13 '15 at 13:57
  • You were wrong in your suggestion because you only suggested `ExecutorService`, `Executors.newCachedThreadPool()` will return a `ExecutorService` that execute many short-lived asynchronous tasks .. So your answer was incorrect initially .. `Executors.newSingleThreadExecutor();` is correct answer which was first proposed by @SleimanJneidi .. – hagrawal7777 Oct 13 '15 at 14:10
  • ExecutorService is an interface with various implementations. I left it as an exercise to the reader to read the API and determine with particular implementation fits their use case. It's a trivial exercise if you actually read the API and understand concurrency. – Dodd10x Oct 13 '15 at 14:16
  • @ThiagoPorciúncula - you need to expand on your question then. Explain why you can't have a single threaded executor for each group. – Dodd10x Oct 13 '15 at 14:19
  • @Dodd10x The explanation is as simple as "there are too many groups and I can't handle this number of threads". I've edited my question, though. But that's the whole point of me wondering about a supposed thread pool with multiple blocking queues. – Fred Porciúncula Oct 13 '15 at 14:20
  • You answer was wrong because there are executor service for synchronous and asynchronous tasks .. By mentioning `ExecutorService` you kept it open for understanding which could have lead OP in chosing wrong implementation .. Reading API for further details is different and expecting OP to read API to get his answer is different, in which case your answer will not be correct and of no use .. If OP has solid understanding of "concurrecy" he would have not asked the question, that's why you are **expected to answer to the point** .. – hagrawal7777 Oct 13 '15 at 14:23
  • hagrawal - "There is a single thread executor that will process each task synchronously." that points him to a thread pool that does not operate asynchronously. Not really sure why you don't see that. Anyway, enough of this, it's going nowhere. – Dodd10x Oct 13 '15 at 14:48
  • Well, having one Executor Pool with one Thread for each Queue is exactly what the OP wants to avoid. So I guess the answer is no (but it should be easy to manufacture, or you have a look a things like the Disruptor Framework). – eckes Oct 13 '15 at 14:57
  • @Dodd10x It certainly looks like a feasible solution, I'll run some tests on it. I believe it would be better to create another answer for it, though. There is too much pollution already in this one. – Fred Porciúncula Oct 13 '15 at 15:44
1

A single thread executor will do

ExecutorService  executorService = Executors.newSingleThreadExecutor();

Which internally uses a ThreadPoolExecutor with a LinkedBlockingQueue

new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()))

So you can use this for your sequential stuff and probably use a multi-threaded executor service for concurrent tasks

Sleiman Jneidi
  • 22,907
  • 14
  • 56
  • 77
  • But then I would have a `singleThreadExecutor` for each of my group of serial tasks, right? I've done that (not exactly that, but something like that), and it works great. The thing is: I can't have a thread for each of my groups, there are way too many groups. That's why I need a single thread pool, with N smart threads that understand my needs for serial execution for some particular tasks. – Fred Porciúncula Oct 13 '15 at 14:12
  • Well, thats a bit different, if you have too many groups then you probably need to a create a list of runnables and submit them to an n number of single threaded thread pools – Sleiman Jneidi Oct 13 '15 at 14:15
  • 1
    I can't have a single threaded thread pool for each of my tasks-that-must-be-serially-executed groups ): – Fred Porciúncula Oct 13 '15 at 14:17
0

What you need is not a special executor, but means to express dependencies between tasks. Instead of a group of tasks which must be executed serially, think of a task which, at the end of execution, sends a signal to the next task, thus starting its execution. So your task can be coded as an actor which waits for allowing signal to start execution. Consider Akka or any other actor library (e.g. mine df4j).

Alexei Kaigorodov
  • 13,189
  • 1
  • 21
  • 38
0

There is no standard implementation of thread pool with these requirements.

Striped Executor Service mentioned in the accepted answer is a good substitute.

The disadvantages I see are: multiple queues (no way to limit queue capacity, or maintain a submission order), thread per stripe (if you have a lot of stripes, your thread pool will grow).

I decided to create similar implementation with single queue:
GitHub - TaggedThreadPoolExecutor.java
It implements standard ExecutorService interface, maintain single queue, takes a maximum number of threads as a parameter, support different rejection policies (similar to standard ThreadPoolExecutor), unlike ThreadPoolExecutor it starts new thread not when queue is full, but when new task is submitted.

AlexP
  • 81
  • 4
0

You could maintain a bunch of queues (List or a Map of queues). Each queue hold a task for that specific class, and have a background running thread which will dequeue tasks from each queue serially and submit them on a separate threadpool executor which could be a bigger in size in terms of number of threads!

Sagar
  • 5,315
  • 6
  • 37
  • 66