2

I have an unbounded queue of jobs which can be processed asynchronously. The processing of each job may or may not trigger the creation of new jobs for this queue.

I would like a pool of several worker threads to take items from this queue and process them in parallel, until both the queue is empty and all worker threads are idle waiting for new jobs on the queue (as a busy worker could end up adding new jobs to the queue).

Is there a recipe for using the java.util.concurrent implementations which I can use to solve this particular problem, where workers are also producers? It is not clear that such a scenario is supported in a straightforward manner from the APIs.

In particular, I want to be able to detect the termination condition, namely, when no more jobs are available (empty job queue) and there will be no more jobs produced (all idle worker threads).

EDIT

Nam San's answer below appears to be the most elegant approach, which basically boiled down to tracking the number of submitted jobs vs. the number of completed jobs, and using the case where these numbers were equal as the termination condition.

I've implemented a full example using java.util.concurrent implementations which extends ThreadPoolExecutor to achieve this, plus specialises the job queue to accept Comparable instances which are sorted in a particular way.

  • TestExecutor.java: A custom executor which extends ThreadPoolExecutor but has additional methods to execute jobs which may create new jobs, and a new await method which waits until all submitted jobs are complete.
  • WorkUnit.java: An example of a comparable, runnable job which may create new jobs to submit to TestExecutor.
  • Test.java: Contains a main method to run an example using WorkUnit instances with a TestExecutor.
dreamcrash
  • 47,137
  • 25
  • 94
  • 117

5 Answers5

1

I don't think it's relevant that consumers are also producers, since in the producer-consumer pattern they are totally separate concerns.

Your consumers already have a reference to the queue - just let them add to it as well like a producer.

You could use a AtomicInteger or similar to record how many workers are currently active, and or use a CountDownLatch if you want to wait till they all quiescent.

Bohemian
  • 412,405
  • 93
  • 575
  • 722
1

I have seen a few different solutions to this type of problem.

One is to still use poll as the blocking call in the main thread, as in your code, but to enqueue a "dummy" object from a worker to wake up the main thread in cases where it might otherwise wait forever. For example, any worker which finishes without adding more items to the queue should submit a dummy job, which the main thread recognizes and ignores (it serves only to wake up the main thread). You can create fewer dummy objects, and hence fewer "spurious wakeups" at the cost of some complexity by keeping track of the number of active jobs - only the last job needs to add a dummy object.

Another method is wait on a different object. For example, any old Object will do. Have the main thread wait() on this object. Then jobs wake up this thread, using Object.notify() any time they complete. Again, with counting, you can reduce the number of required notifies.

The most elegant solution may be to use Semaphore. Basically the value of the semaphore will the negative of the number of "in flight jobs + queue items". This value doesn't change when a job picks an item up off the queue (since in flight jobs goes up by one, while queue items go down by one), but every job should call reducePermits() for every job they add, and one release() call before they finish.

Then the main thread can just block on acquire() for the duration of the work. When it wakes up, everything is done (because in flight + queued work is zero). You'd start another thread to actually do the poll call and add jobs (done my by the main thread currently), and this worker can just be shut down when the acquire on the main thread returns. However, it might be even simpler just to have the existing worker poll() themselves, rather than finishing. Then you don't need this transfer function at all.

In fact, with the Semaphore solution, why not just dispense with the queue entirely, and use the one built into the executor? That is, have the workers submit new work via executor.submit(newJob(nextJob))? Internally the executor threads are pulling work off of a blocking queue anyway, so there is some duplication in having an explicit external queue.

BeeOnRope
  • 60,350
  • 16
  • 207
  • 386
1

Years ago I had to do something similar but with bounded stack. I will share a possible solution:

idle_thread = MAX_THREAD;
do
{
    if(queue != empty) // If thread have work to do
    {
       idle_threads--;  // Count this threads was a worker   
       flag = true;
       while(queue != empty)  // Until queue have work
       {
          synchronized(this)
          {
            // task =  take_out_of_queue;
          }
        }
   }
   if(flag) // This flag must to be local to each thread, it is use to insure 
   {        // that threads will count this only one time for each time 
          // the queue got empty
         synchronized(this)
         {
            if(flag == false)
            idle_threads++;  // Count thread as a idle one
            flag = false;
         }
     }
     if(idle_threads == MAX_THREADS) out = true; // When all threads are idle stop the work loop
} while(!out)
dreamcrash
  • 47,137
  • 25
  • 94
  • 117
1

See my post on Directory Scanner It fulfills most of the requirements. But its not implemented with Futures and Callable's. Have to think that one out. Each task is not given importance. There is no result and Exceptions produced. Its just a parallel & recursive way of scanning for files.

Community
  • 1
  • 1
clinton
  • 612
  • 3
  • 6
1

The code below demonstrates how you could use a wrapper class around an Executor to count the number of submitted jobs and compare it to the number of completed jobs to achieve what you want. Note that your tasks must call the execute method of the wrapper class and never call the underlying Executor directly. It should be trivial to extend the wrapper below to wrap the 'submit' methods of an ExecutorService if needed.

public class ExampleExecutor {

    private final Executor executor;
    private long submitCount = 0;
    private long doneCount = 0;

    public ExampleExecutor(Executor executor) {
        this.executor = executor;
    }

    public synchronized void execute(Collection<Runnable> commands) {
        for (Runnable command : commands) {
            execute(command);
        }
    }

    public synchronized void execute(final Runnable command) {
        submitCount ++;

        executor.execute(new Runnable() {
            public void run() {
                try {
                    command.run();
                } finally {
                    synchronized (ExampleExecutor.this) {
                        doneCount++;
                        if (doneCount == submitCount) {
                            ExampleExecutor.this.notifyAll();
                        }
                    }
                }
            }
        });
    }

    public synchronized void awaitCompletion() throws InterruptedException {
        while (doneCount != submitCount) {
            this.wait();
        }
    }
}

EDIT: Added test case below to demonstrate how the above code can be used

public class Test {

    static class Task implements Runnable {
        private final String id;
        private final long repetitions;
        private final long respawnSize;
        private final ExampleExecutor executor;

        public Task(String id, long repetitions, long respawnSize, ExampleExecutor executor) {
            this.id = id;
            this.repetitions = repetitions;
            this.respawnSize = respawnSize;
            this.executor = executor;
        }

        public void run() {
            for (int i = 0; i < respawnSize; i ++) {
                // Spawning new sub tasks
                executor.execute(new Task(id + "-" + i, repetitions/2, 0, null));
            }

            double sum = 0;
            for (int i = 0; i < repetitions; i++) {
                sum += Math.sin(i);
            }

            System.err.println(id + " completed at " + System.currentTimeMillis() + " with sum=" + sum);
        }
    }

    public static void main(String argv[]) throws InterruptedException {
        ExampleExecutor executor = new ExampleExecutor(Executors.newFixedThreadPool(2));
        executor.execute(new Task("0", 2000000, 100, executor));

        System.err.println("main thread awaits completion");
        executor.awaitCompletion();
        System.err.println("main thread recieved completion event");
    }
}
Nam San
  • 1,145
  • 9
  • 13
  • @sharky I believe you've miss-understood the code, it doesn't poll. I should have clarified how you would use it - you have a main thread that submits one or more tasks via execute(), the thread then calls awaitCompletion() which will block until all the tasks have completed and the queue is empty. Your tasks can submit additional work via calls to execute(), but the tasks must never call awaitCompletion(). Your main thread will then recieve a notification and return from the awaitCompletion() call once all of the jobs have completed. I've edited the answer to add some demonstration code. – Nam San Oct 29 '12 at 14:19