4

I'm trying to find a less clunky solution to a Java concurrency problem.

The gist of the problem is that I need a shutdown call to block while there are still worker threads active, but the crucial aspect is that the worker tasks are each spawned and completed asynchronously so the hold and release must be done by different threads. I need them to somehow send a signal to the shutdown thread once their work has completed. Just to make things more interesting, the worker threads cannot block each other so I'm unsure about the application of a Semaphore in this particular instance.

I have a solution which I think safely does the job, but my unfamiliarity with the Java concurrency utils leads me to think that there might be a much easier or more elegant pattern. Any help in this regard would be greatly appreciated.

Here's what I have so far, fairly sparse except for the comments:

final private ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
volatile private int activeWorkerThreads;
private boolean isShutdown;

private void workerTask()
{
   try
   {
      // Point A: Worker tasks mustn't block each other.
      shutdownLock.readLock().lock();

      // Point B: I only want worker tasks to continue if the shutdown signal
      // hasn't already been received.
      if (isShutdown)
         return;

      activeWorkerThreads ++;

      // Point C: This async method call returns immediately, soon after which
      // we release our lock. The shutdown thread may then acquire the write lock
      // but we want it to continue blocking until all of the asynchronous tasks
      // have completed.
      executeAsynchronously(new Runnable()
      {
         @Override
         final public void run()
         {
            try
            {
              // Do stuff.
            }
            finally
            {
               // Point D: Release of shutdown thread loop, if there are no other
               // active worker tasks.
               activeWorkerThreads --;
            }
         }
      });
   }
   finally
   {
      shutdownLock.readLock().unlock();
   }
}


final public void shutdown()
{
   try
   {
      // Point E: Shutdown thread must block while any worker threads
      // have breached Point A.
      shutdownLock.writeLock().lock();

      isShutdown = true;

      // Point F: Is there a better way to wait for this signal?
      while (activeWorkerThreads > 0)
         ;

      // Do shutdown operation.
   }
   finally
   {
      shutdownLock.writeLock().unlock();
   }
}

Thanks in advance for any help!

Russ

Russ
  • 63
  • 7
  • To follow up, I ended up reverting to an earlier implementation where I did a 2nd read-lock & shutdown flag check (identical to Point A & Point B) within the async worker Runnable, which I wanted to avoid but it doesn't appear that there's a valid solution without some sort of locking cost in that Runnable, as the feedback here has indicated. Unless I'm prepared to do no locking at all and simply deal with the exceptions generated by executeAsynchronously() after shutdown - seems messy. – Russ Aug 03 '11 at 09:42

5 Answers5

3

Declaring activeWorkerThreads as volatile doesn't allow you to do activeWorkerThreads++, as ++ is just shorthand for,

activeWorkerThreads = activeWorkerThreads + 1;

Which isn't atomic. Use AtomicInteger instead.

Does executeAsynchronously() send jobs to a ExecutorService? If so you can just use the awaitTermination method, so your shutdown hook will be,

executor.shutdown();
executor.awaitTermination(1, TimeUnit.Minutes); 
sbridges
  • 24,960
  • 4
  • 64
  • 71
  • Thanks heaps for your quick response. You're dead right, the volatile int was being concurrently hit with separate read and update operations, so I've changed it to an AtomicInteger as you suggested. Unfortunately executeAsynchronously() is sending its jobs to a third party service that's outside of my control so I can't add the hooks in this instance, but thanks for the suggestion. – Russ Aug 03 '11 at 06:19
2

ExecutorService should be a preferred solution as sbridges mentioned.

As an alternative, if the number of worker threads is fixed, then you can use CountDownLatch:

final CountDownLatch latch = new CountDownLatch(numberOfWorkers);

Pass the latch to every worker thread and call latch.countDown() when task is done.

Call latch.await() from the main thread to wait for all tasks to complete.

denis.solonenko
  • 11,645
  • 2
  • 28
  • 23
  • Hi, thanks for your quick response, unfortunately the number of worker threads will fluctuate for this app so the CountdownLatch can't be used in this case even though it is handy for releasing unowned locks which is applicable here. – Russ Aug 03 '11 at 06:21
  • Nice. Seems a solution quite similar to Semaphore one. – Ovidiu Lupas Aug 12 '11 at 12:52
2

You can use a semaphore in this scenario and not require a busy wait for the shutdown() call. The way to think of it is as a set of tickets that are handed out to workers to indicate that they are in-flight. If the shutdown() method can acquire all of the tickets then it knows that it has drained all workers and there is no activity. Because #acquire() is a blocking call the shutdown() won't spin. I've used this approach for a distributed master-worker library and its easy extend it to handle timeouts and retrials.

Executor executor = // ...
final int permits = // ...
final Semaphore semaphore = new Semaphore(permits);

void schedule(final Runnable task) {
  semaphore.acquire();
  try {
    executor.execute(new Runnable() {
      @Override public run() {
        try {
          task.run();
        } finally {
          semaphore.release();
        }
      }
    });
  } catch (RejectedExecutionException e) {
    semaphore.release();
    throw e;
  }
}

void shutDown() {
  semaphore.acquireUninterruptibly(permits);

  // do stuff
}
Ben Manes
  • 9,178
  • 3
  • 35
  • 39
  • Ben that's exactly what I was after, thanks! I was resigned to the double locking (or no locking at all: ) but this is great. Likewise good to hear that it can easily be adapted to timeouts, the shutdown method might need this. – Russ Aug 04 '11 at 00:43
  • I used timeouts & retrials for the workers, since they were RPCs. In that case I simulated a Semaphore with a BlockingQueue so that I could assign each worker a unique ticket id to associated with the request for retrial. This let me throttle the master to wait until a ticket was returned or a timeout to retry the expired worker. The shutdown() looped over the shared ticketing method to capture them and releasing a ticket if a retrial was needed. If your workers are remote RPCs you'll probably want this enhancement. The code I wrote was surprisingly simple, succinct, and fast. – Ben Manes Aug 04 '11 at 01:00
  • Russ, you mentioned in response to a previous answer (denis.solonenko) that the number of tasks wasn't fixed; unless I'm really misunderstanding something, you have the same problem with this semaphore solution seeing as you need to know the "permits" value at the beginning. – davmac Aug 04 '11 at 01:05
  • @davmac He can use Integer.Max_VALUE since the bulk acquisition is done through a compare-and-swap call (vs. a loop). This allows it to emulate being unbounded. – Ben Manes Aug 04 '11 at 01:09
  • @davmac I initially had the same concern because as you say the number of tasks is unknown, but the permits is just a count so maxing it as Ben suggests doesn't cost any extra. – Russ Aug 04 '11 at 01:47
1

Whoa nelly. Never do this:

  // Point F: Is there a better way to wait for this signal?
  while (activeWorkerThreads > 0)
     ;

You're spinning and consuming CPU. Use a proper notification:

First: synchronize on an object, then check activeWorkerThreads, and wait() on the object if it's still > 0:

synchronized (mutexObject) {
    while (activeWorkerThreads > 0) {
        mutexObject.wait();
    }
}

Second: Have the workers notify() the object after they decrement the activeWorkerThreads count. You must synchronize on the object before calling notify.

synchronized (mutexObject) {
    activeWorkerThreads--;
    mutexObject.notify();
}

Third: Seeing as you are (after implementing 1 & 2) synchronizing on an object whenever you touch activeWorkerThreads, use it as protection; there is no need for the variable to be volatile.

Then: the same object you use as a mutex for controlling access to activeWorkerThreads could also be used to control access to isShutdown. Example:

synchronized (mutexObject) {
    if (isShutdown) {
        return;
    }
}

This won't cause workers to block each other except for immeasurably small amounts of time (which you likely do not avoid by using a read-write lock anyway).

davmac
  • 20,150
  • 1
  • 40
  • 68
  • Hi, thanks for your quick response. Your solution is similar to how I had previously implemented the workers and shutdown, ie. with the workers grabbing a lock both at Point A, and also within the async Runnable. So maybe I've regressed to a worse solution. :) The extra locking within the Runnable is a valid solution, and I take your point about the workers only blocking each other for small amounts of time. On the other hand, it somehow doesn't seem right that the worker threads are having to block multiple times, when they know from Point B that they have the green light until they finish. – Russ Aug 03 '11 at 06:27
  • I should also add that the context for these calls is that it's for a GUI framework where there are thousands of worker calls to workerTask(), in contrast to that once-off call to shutdown() - which does do that ugly wait loop and which I would love to get rid of or adapt to something more elegant but it's a case of where would I least like the app to take the CPU hit? Even if the shutdown thread can do a timeout if it's waiting for too along, that might also be acceptable. I hope that makes some sense. – Russ Aug 03 '11 at 06:27
  • Russ, they're not really blocking if they're synchronizing only to check or alter the value of a variable. The only time this would cause them to "block" would be if they happened to be doing that at the exact same time as one of the other worker threads, and in that case they would block for such a miniscule amount of time as not to matter. You can use `wait(long)` instead of `wait()` in the shutdown if you want a timeout. – davmac Aug 04 '11 at 01:09
  • Yeah I'm probably being overzealous with concern over the locking, after all there would probably be some sort of locking under the hood no matter which solution I went with - AtomicInteger with spinning shutdown check, sync on mutex, or releasing a semaphore..? I'm still finding my way with this stuff. Also since I'm new here I didn't realise that adding an accepted solution would remove yours, so my apologies - your solution works fine also. – Russ Aug 04 '11 at 01:41
  • No problem. Using a semaphore is probably a better solution anyway. – davmac Aug 04 '11 at 07:39
0

This is more like a comment to sbridges answer, but it was a bit too long to submit as a comment.

Anyways, just 1 comment.

When you shutdown the executor, submitting new task to the executor will result in unchecked RejectedExecutionException if you use the default implementations (like Executors.newSingleThreadExecutor()). So in your case you probably want to use the following code.

code:

new ThreadPoolExecutor(1,
                       1,
                       1,
                       TimeUnit.HOURS,
                       new LinkedBlockingQueue<Runnable>(),
                       new ThreadPoolExecutor.DiscardPolicy());

This way, the tasks that were submitted to the executor after shutdown() was called, are simply ignored. The parameter above (1,1... etc) should produce an executor that basically is a single-thread executor, but doesn't throw the runtime exception.

Enno Shioji
  • 26,542
  • 13
  • 70
  • 109
  • the variable is only protected by a read/write lock, so I think you do need to use AtomicInteger, multiple threads can be doing ++ at the same time. – sbridges Aug 03 '11 at 04:58
  • Hi thanks for the quick response, unfortunately in this case I'm dealing with third party async calls (not to any ExecutorService), but since I'm still unfamiliar with a lot of the Java concurrency stuff I'll happily take your tip about suppressing the RuntimeExceptions. Up until now I've always used the convenience methods - Executors.newxxxExecutor(). – Russ Aug 03 '11 at 06:33