4

I have a thread pool of m threads. Let's say m were 10 and fix. Then there are n queues with the possibility of n becoming large (like 100'000 or more). Every queue holds tasks to be executed by those m threads. Now, very important, every queue must be worked off sequentially task by task. This is a requirement to make sure that tasks are executed in the order they were added to the queue. Otherwise the data could become inconsistent (same as, say, with JMS queues).

So the question is now how to make sure that the tasks in those n queues are processed by the available m threads in a way that no task added to the same queue can be executed "at the same time" by different threads.

I tried to solve this problem myself and figured out that it is quite demanding. Java ThreadPoolExecutor is nice, but you would have to add quite a bit of functionality that is not easy to develop. So the question is whether anyone knows of some framework or system for Java that already solves this problem?

Update

Thanks to Adrian and Tanmay for their suggestions. The number of queues may be very large (like 100'000 or more). So one thread per queue is unhappily not possible although it would be simple and easy. I will look into the fork join framework. Looks like an interesting path to pursue.

My current first iteration solution is to have a global queue to which all tasks are added (using a JDK8 TransferQueue, which has very little locking overhead). Tasks are wrapped into a queue stub with the lock of the queue and its size. The queue itself does not exist physically, only its stub.

An idle thread first needs to obtain a token before it can access the global queue (the token would be a single element in a blocking queue, e.g. JDK8 TransferQueue). Then it does a blocking take on the global queue. When a task was obtained, it checks whether the queue lock of the task's queue stub is down. Actually, I think just using an AtomicBoolean would be sufficient and create less lock contention than a lock or synchronized block.

When the queue lock is obtained, the token is returned to the global queue and the task is executed. If it is not obtained, the task is added to a 2nd level queue and another blocking take from the global queue is done. Threads need to check whether the 2nd level queue is empty and take a task from it to be executed as well.

This solution seems to work. However, the token every thread needs to acquire before being allowed to access the global queue and the 2nd level queue looks like a bottleneck. I believe it will create high lock contention. So, I'm not so happy with this. Maybe I start with this solution and elaborate on it.

Update 2

All right, here now the "best" solution I have come up with so far. The following queues are defined:

Ready Queue (RQ): Contains all tasks that can be executed immediately by any thread in the thread pool

Entry Queue (EQ): Contains all tasks the user wants to be executed as well as internal admin tasks. The EQ is a priority queue. Admin tasks have highest priority.

Channels Queues (CQ): For every channel there is an internal channel queue that is used to preserve the ordering of the tasks, e.g. make sure task are executed sequentially in the order they were added to EQ

Scheduler: Dedicated thread that takes tasks from EQ. If the task is a user task it is added to the CQ of the channel the task was added to. If the head of the CQ equals the just inserted user task it is also added to the EQ (but remains in the CQ) so that it is executes as soon as the next thread of the thread pool becomes available.

If a user task has finished execution an internal task TaskFinished is added to RQ. When executed by the scheduler, the head is taken from the associated CQ. If the CQ is not empty after the take, the next task is polled (but not taken) from the CQ and added to the RQ. The TaskFinished tasks have higher priority than user tasks.

This approach contains in my opinion no logical errors. Note that EQ and RQ need to be synchronized. I prefer using TransferQueue from JDK8 which is very fast and where checking for it to be empty or not, polling the head item is also very fast. The CQs need not be synchronized as they are always accessed by the Scheduler only.

So far I'm quite happy with this solution. What makes me think is whether the Scheduler could turn into a bottleneck. If there are much more tasks in the EQ than it can handle the EQ might grow building up some backlog. Any opinions about that would be appreciated :-)

OlliP
  • 1,545
  • 11
  • 22
  • I remember Spring having some "sequential" keyword somewhere. Have you considered JMS? Also this might be a duplicate of http://stackoverflow.com/questions/7192223/ensuring-task-execution-order-in-threadpool. EDIT Executors.newSingleThreadExecutor() – Richard Feb 16 '15 at 16:07
  • Wait; if you want them to be executed sequentially, why use parallelism in the first place? – fge Feb 16 '15 at 16:08
  • JMS would not be an option, because it is all VM local, e.g. no reason to leave the VM heap space and loose performance beause of this. I have a processor with multiple cores. I want to make sure they are all busy. That's why parallelism comes into play ;-). – OlliP Feb 16 '15 at 16:13
  • You want tasks in one queue to be executed sequentially, but you have no requirements that tasks in different queues be executed in a specific order and/or sequentially, correct? – Adrian Leonhard Feb 16 '15 at 16:29
  • @Adrian: Yes, this is correct. – OlliP Feb 16 '15 at 16:31
  • There are a number of problems: You should post code instead of describing what your code does. Don't implement your own mutexes; use Java's features. This should not be necessary anyways, as Queues are thread-safe and the ThreadPoolExecutor should handle most of this (ExecutorService.newFixedThreadPool has it's own queue which it will work off with a limited number of threads). Note that in my answer I suggest creating a Task for every queue, not a Thread for ever queue; I will edit it to clear it up. – Adrian Leonhard Feb 17 '15 at 11:54
  • @Adrian: I fear I would have to post tons of code. Think that wouldn't be practical. I think I found a solution that works. Let me know in case you are interested. – OlliP Feb 19 '15 at 08:08
  • Answering the question with what worked for you is [generally encouraged](http://stackoverflow.com/help/self-answer). Also, your description makes it seem like some improvement is possible, but that would need more details. – Adrian Leonhard Feb 19 '15 at 10:33
  • @Adrian: All right, just wrote another update :-) – OlliP Feb 26 '15 at 21:44

2 Answers2

0

You can use Fork Join Framework if you are working in Java 7 or Java 8.

You can create a RecursiveTask using popped first element from each queue.
Remember to provide a reference to the queues to the corresponding RecursiveTasks.

Invoke all of the at once. (In a loop or stream).

Now at the end of the compute method (after processing of a task is completed), create another RecursiveTask by popping another element from the corresponding queue and call invoke on it.


Notes:

  • Each task will be responsible for extracting new element from the queue, so all tasks from the queue would be executed sequentially.
  • There should be a new RecursiveTask created and invoked separately for each element in the queues. This ensures that some queues do not hog the threads and starvation is avoided.
  • Using an ExecutorService is also a viable option, but IMO ForkJoin's API if friendlier for your use case

Hope this helps.

Tanmay Patil
  • 6,882
  • 2
  • 25
  • 45
  • Going with the fork join framework looks interesting. When using the ExecutorService I could get some funtionality implemented that when all threads are busy an exception would be thrown or false would be returned upon adding a new task to the global queue (see what I wrote in the update section in my initial question). The the user knows that he machine is on its knees and that it should send the task for execution to some other machien. – OlliP Feb 17 '15 at 09:49
  • If user does not want to *wait* for the machine, then there would be no point of a *queue*. Rethinking your requirements and then approaching for a design would be a good idea IMO. Good luck. – Tanmay Patil Feb 17 '15 at 19:34
0

One simple solution would be to create a task whenever an element is added to an empty queue. This task would be responsible for only that queue and would end when the queue has been worked off. Ensure that the Queue implementations are thread-safe and the task stops after removing the last element.

EDIT: These tasks should be added to a ThreadPoolExecutor with an internal queue, for example one created by ExecutorService.newFixedThreadPool, which will work off the tasks in parallel with a limited number of threads.


Alternatively, just divide the queues among a fixed number of threads:

public class QueueWorker implements Runnable {
    // should be unique and < NUM_THREADS:
    int threadId;

    QueueWorker(int threadId) {
        this.threadId = threadId;
    }

    @Override
    public void run() {
        int currentQueueIndex = threadId;
        while (true) {
            Queue currentQueue = queues.get(currentQueue);
            // execute tasks until empty
            currentQueueIndex += NUM_THREADS;
            if (currentQueueIndex > queues.size()) {
                currentQueueIndex = threadId;
            }
        }
    }
}
Adrian Leonhard
  • 7,040
  • 2
  • 24
  • 38