8

There are a huge amount of tasks. Each task is belong to a single group. The requirement is each group of tasks should executed serially just like executed in a single thread and the throughput should be maximized in a multi-core (or multi-cpu) environment. Note: there are also a huge amount of groups that is proportional to the number of tasks.

The naive solution is using ThreadPoolExecutor and synchronize (or lock). However, threads would block each other and the throughput is not maximized.

Any better idea? Or is there exist a third party library satisfy the requirement?

James
  • 1,001
  • 2
  • 15
  • 23
  • 2
    "However, threads would block each other and the throughput is not maximized.". Do you mean that the individual tasks are accessing a shared data structure or resource and this is the cause of the contention? – Adamski Jul 15 '10 at 07:59
  • Do you know all of a group's tasks in advance? This is important when choosing a solution (queues vs no queues) – Eyal Schneider Jul 15 '10 at 09:39

5 Answers5

3

A simple approach would be to "concatenate" all group tasks into one super task, thus making the sub-tasks run serially. But this will probably cause delay in other groups that will not start unless some other group completely finishes and makes some space in the thread pool.

As an alternative, consider chaining a group's tasks. The following code illustrates it:

public class MultiSerialExecutor {
    private final ExecutorService executor;

    public MultiSerialExecutor(int maxNumThreads) {
        executor = Executors.newFixedThreadPool(maxNumThreads);
    }

    public void addTaskSequence(List<Runnable> tasks) {
        executor.execute(new TaskChain(tasks));
    }

    private void shutdown() {
        executor.shutdown();
    }

    private class TaskChain implements Runnable {
        private List<Runnable> seq;
        private int ind;

        public TaskChain(List<Runnable> seq) {
            this.seq = seq;
        }

        @Override
        public void run() {
            seq.get(ind++).run(); //NOTE: No special error handling
            if (ind < seq.size())
                executor.execute(this);
        }       
    }

The advantage is that no extra resource (thread/queue) is being used, and that the granularity of tasks is better than the one in the naive approach. The disadvantage is that all group's tasks should be known in advance.

--edit--

To make this solution generic and complete, you may want to decide on error handling (i.e whether a chain continues even if an error occures), and also it would be a good idea to implement ExecutorService, and delegate all calls to the underlying executor.

Eyal Schneider
  • 22,166
  • 5
  • 47
  • 78
  • Maybe we should also add a map so that we can find the TaskChain of a specified Task and add it to its TaskChain. – James Jul 16 '10 at 02:15
  • @James: you are right. With some simple synchronization, the chains will be able to receive new tasks on the fly (they will actually act as queues for the caller). I think I will make the solution more generic and useful, and write about it in my blog :) – Eyal Schneider Jul 16 '10 at 14:20
2

I would suggest to use task queues:

  • For every group of tasks You have create a queue and insert all tasks from that group into it.
  • Now all Your queues can be executed in parallel while the tasks inside one queue are executed serially.

A quick google search suggests that the java api has no task / thread queues by itself. However there are many tutorials available on coding one. Everyone feel free to list good tutorials / implementations if You know some:

Dave O.
  • 2,231
  • 3
  • 21
  • 25
  • Thanks Dave. If there are a huge amount of groups, then the number of thread will hit the limit. – James Jul 15 '10 at 08:44
  • @James Not necessarily. Only because You have n groups doesn't mean You need to create n threads to execute them. Just create as many threads as You think are suitable and they will take care of the queues either in a round robin fashion or serially. – Dave O. Jul 15 '10 at 09:09
1

I mostly agree on Dave's answer, but if you need to slice CPU time across all "groups", i.e. all task groups should progress in parallel, you might find this kind of construct useful (using removal as "lock". This worked fine in my case although I imagine it tends to use more memory):

class TaskAllocator {
    private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork
         = childQueuePerTaskGroup();

    public Queue<Runnable> lockTaskGroup(){
        return entireWork.poll();
    }

    public void release(Queue<Runnable> taskGroup){
        entireWork.offer(taskGroup);
    }
 }

and

 class DoWork implmements Runnable {
     private final TaskAllocator allocator;

     public DoWork(TaskAllocator allocator){
         this.allocator = allocator;
     }

     pubic void run(){
        for(;;){
            Queue<Runnable> taskGroup = allocator.lockTaskGroup();
            if(task==null){
                //No more work
                return;
            }
            Runnable work = taskGroup.poll();
            if(work == null){
                //This group is done
                continue;
            }

            //Do work, but never forget to release the group to 
            // the allocator.
            try {
                work.run();
            } finally {
                allocator.release(taskGroup);
            }
        }//for
     }
 }

You can then use optimum number of threads to run the DoWork task. It's kind of a round robin load balance..

You can even do something more sophisticated, by using this instead of a simple queue in TaskAllocator (task groups with more task remaining tend to get executed)

ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue = 
    new ConcurrentSkipListSet(new SophisticatedComparator());

where SophisticatedComparator is

class SophisticatedComparator implements Comparator<MyQueue<Runnable>> {
    public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){
        int diff = o2.size() - o1.size();
        if(diff==0){
             //This is crucial. You must assign unique ids to your 
             //Subqueue and break the equality if they happen to have same size.
             //Otherwise your queues will disappear...
             return o1.id - o2.id;
        }
        return diff;
    }
 }
Enno Shioji
  • 26,542
  • 13
  • 70
  • 109
  • 1
    +1 task queues allow you to use any scheduling algorithm that suits your needs. – Dave O. Jul 15 '10 at 09:14
  • It looks like you are re-implementing a thread pool. Why not use the standard ThreadPoolExecutor plus some extra functionality as in my solution? My solution requires no queues and no synchronization. – Eyal Schneider Jul 15 '10 at 09:32
  • @Eyal: If it is okay to consume task groups sequentially, I agree with you. However, if they have to be consumed in parallel, this is necessary. – Enno Shioji Jul 15 '10 at 09:42
  • In my solution groups are executed in parallel, and each group in executed serially, just as in your solution. The big difference between our solutions (if I understand correctly) is that your solution allows adding new tasks to an existing group on the fly, while my solution is much simpler because it assumes that whenever a group starts executing, all its tasks are known in advance. – Eyal Schneider Jul 15 '10 at 09:55
  • Oh okay, that's why you are re-submitting.. Clever :) I guess the fact TPE only allows `BlockingQueue` could potentially be limiting, but now I see your point.. – Enno Shioji Jul 15 '10 at 11:07
0

Actor is also another solution for this specified type of issues. Scala has actors and also Java, which provided by AKKA.

James
  • 1,001
  • 2
  • 15
  • 23
-2

I had a problem similar to your, and I used an ExecutorCompletionService that works with an Executor to complete collections of tasks. Here is an extract from java.util.concurrent API, since Java7:

Suppose you have a set of solvers for a certain problem, each returning a value of some type Result, and would like to run them concurrently, processing the results of each of them that return a non-null value, in some method use(Result r). You could write this as:

void solve(Executor e, Collection<Callable<Result>> solvers)
        throws InterruptedException, ExecutionException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    for (Callable<Result> s : solvers)
        ecs.submit(s);
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
        Result r = ecs.take().get();
        if (r != null)
            use(r);
    }
}

So, in your scenario, every task will be a single Callable<Result>, and tasks will be grouped in a Collection<Callable<Result>>.

Reference: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html

Olly
  • 31
  • 4