2

I'm developing a Java app that for much of the time, including the point of shutdown, is having to deal with a flood of incoming asynchronous calls from a foreign framework. During normal operation these incoming calls then need to be dispatched to another framework, again asynchronously.

At the moment I'm having my module be a "good" citizen and do some locking around a shutdown flag which, once set, will gracefully cease the dispatch of any further outgoing calls.

The troubling thing is that because both incoming and outgoing calls are asynchronous, I'm having to make each "worker" task perform two sets of locking (see below) to do the same shutdown flag check (EDIT: It's been pointed out to me in another question that using Semaphores only one acquire/release is needed for each worker). It works, but there are many of these worker tasks to handle and I worry about the cumulative slowdown in performance. Profiling will come soon once the framework is expanded a little but regardless of the result it'd be good to follow best practices.

An alternative is to simply do no shutdown flag check locking and handle the anticipated exceptions that are generated when the external frameworks are shutdown before the async calls have finished processing. I should add that there are no detrimental operational effects if this approach is taken. Both methods will result in a clean shutdown.

Your ideas on which is the better practice, please? Heavy-handed locking with no exceptions, versus no locking but a barrage of exceptions.

With locks, the worker task code looks something like this:

final private ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private boolean isShutdown;

private void workerTask()
{
   try
   {
      shutdownLock.readLock().lock();

      if (isShutdown)
         return;

      executeAsynchronously(new Runnable()
      {
         @Override
         final public void run()
         {
            try
            {
               shutdownLock.readLock().lock();

               if (isShutdown)
                  return;

               // Do something here.
            }
            finally
            {
               shutdownLock.readLock().unlock();
            }
         }
      });
   }
   finally
   {
      shutdownLock.readLock().unlock();
   }
}

The shutdown method requests the shutdownLock.writeLock(), then sets the isShutdown flag.

An alternative without locking and anticipating the shutdown-generated exceptions looks something like this:

volatile private boolean isShutdown;

private void workerTask()
{
   try
   {
      executeAsynchronously(new Runnable()
      {
         @Override
         final public void run()
         {
            try
            {
               // Do something here.
            }
            catch (final FrameworkRuntimeException exception)
            {
               if ((! isShutdown) || (exception.type != 
                                      FrameworkRuntimeException.FrameworkIsDisposed))
                  throw exception;
            }
         }
      });
   }
   catch (final FrameworkRuntimeException exception)
   {
      if ((! isShutdown) || (exception.type != 
                             FrameworkRuntimeException.FrameworkIsDisposed))
         throw exception;
   }
}

The shutdown method for this implementation sets the volatile isShutdown flag to true.

Thanks in advance for any feedback,

Russ

EDIT: It's been helpfully pointed out to me in another question that I could use a Semaphore to avoid the double locking in the first approach, so it wouldn't be so heavy-handed after all, but the question still stands.

Russ
  • 63
  • 7
  • 1
    I don't have much experience in this field, but your first solution (locks) looks a bit excessive. Also you don't say what "do something" is, but you might be able to only place one or two `if (isShutDown) return;` in there to be safe. – toto2 Aug 04 '11 at 00:43
  • You're right, it is excessive, it turns out that I can use semaphore permits instead of the double locking. With that approach, yeah I'd only need the one check against the shutdown flag in the outer body. Thanks for the comment. – Russ Aug 04 '11 at 00:56
  • I'm thinking about how to solve this nicely with little overhead and shutdown flags (the interesting part is that the shutdown can only start as soon as one can be sure that no other thread can send further messages), but really if the exception approach works nicely that's really the one you should be using - simpler, less code, no overhead. – Voo Aug 04 '11 at 02:07

3 Answers3

1

In general I would favour the approach where you check for shutdown, then execute the task. If you optimistically and then throw away exceptions that you 'know' are due to shutdown then you run the risk of misclassifying an error and missing out on a real problem.

As far as simplifying the code goes, you can get rid of all the locks and just make sure that your executeAsynchronously method uses an ExecutorService - then your shutdown method just calls shutdown on the service, the task creation can be skipped if isShutdown returns true and if you need to wait for tasks to finish before returning you can use the helpful awaitTermination method.

rxg
  • 3,777
  • 22
  • 42
  • +1 for executor service. I just ran into this problem with Spring `@Scheduled` methods during shutdown, and I had to [implement an `ApplicationListener`](http://stackoverflow.com/a/6603443/433348) to create the hook for shutting down the TaskScheduler (similar to executor). – Kevin Welker Jun 21 '13 at 17:35
0

if you use spring below code works for gracefull shutdown. You may change the retry numbers.

package com.avea.vpspg.test.schedulers;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

import com.avea.vpspg.core.VasProvLogger;

@Component
class ContextClosedHandler implements ApplicationListener<ContextClosedEvent> , ApplicationContextAware,BeanPostProcessor{


private ApplicationContext context;

public Logger logger = XProvLogger.getInstance().x;

public void onApplicationEvent(ContextClosedEvent event) {


    Map<String, ThreadPoolTaskScheduler> schedulers = context.getBeansOfType(ThreadPoolTaskScheduler.class);

    for (ThreadPoolTaskScheduler scheduler : schedulers.values()) {         
        scheduler.getScheduledExecutor().shutdown();
        try {
            scheduler.getScheduledExecutor().awaitTermination(20000, TimeUnit.MILLISECONDS);
            if(scheduler.getScheduledExecutor().isTerminated() || scheduler.getScheduledExecutor().isShutdown())
                logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has stoped");
            else{
                logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has not stoped normally and will be shut down immediately");
                scheduler.getScheduledExecutor().shutdownNow();
                logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has shut down immediately");
            }
        } catch (IllegalStateException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    Map<String, ThreadPoolTaskExecutor> executers = context.getBeansOfType(ThreadPoolTaskExecutor.class);

    for (ThreadPoolTaskExecutor executor: executers.values()) {
        int retryCount = 0;
        while(executor.getActiveCount()>0 && ++retryCount<51){
            try {
                logger.info("Executer "+executor.getThreadNamePrefix()+" is still working with active " + executor.getActiveCount()+" work. Retry count is "+retryCount);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        if(!(retryCount<51))
            logger.info("Executer "+executor.getThreadNamePrefix()+" is still working.Since Retry count exceeded max value "+retryCount+", will be killed immediately");
        executor.shutdown();
        logger.info("Executer "+executor.getThreadNamePrefix()+" with active " + executor.getActiveCount()+" work has killed");
    }
}


@Override
public void setApplicationContext(ApplicationContext context)
        throws BeansException {
    this.context = context;

}


@Override
public Object postProcessAfterInitialization(Object object, String arg1)
        throws BeansException {
    return object;
}


@Override
public Object postProcessBeforeInitialization(Object object, String arg1)
        throws BeansException {
    if(object instanceof ThreadPoolTaskScheduler)
        ((ThreadPoolTaskScheduler)object).setWaitForTasksToCompleteOnShutdown(true);
    if(object instanceof ThreadPoolTaskExecutor)
        ((ThreadPoolTaskExecutor)object).setWaitForTasksToCompleteOnShutdown(true);
    return object;
}
fatih tekin
  • 959
  • 10
  • 21
0

Ok so I think this here should work and not impede too much of a runtime overhead (note it's 4.30 in the morning here, so better double check ;) ). Also note that throwing in some good try{} finally{} code blocks would be a pretty good idea, but omitted for clarity.

public static final AtomicInteger activeConnections = new AtomicInteger();
public static volatile boolean shutdown = false;

public static void shutdown() {
    shutdown = true;
    while (activeConnections.get() > 0) {
        synchronized(activeConnections) {
            try {
                activeConnections.wait();
            }
            catch(InterruptedException e) {
            }
        }
    }
    // proceed shutdown
}

public static void run() {
    if (shutdown) return;
    activeConnections.incrementAndGet();
    if (shutdown) {
        leave();
        return;
    }
    // do stuff
    leave();
}

private static void leave() {
    int outstandingConnections = activeConnections.decrementAndGet();
    if (shutdown && outstandingConnections == 0) {
        synchronized(activeConnections) {
            activeConnections.notifyAll();
        }
    }       
}

As soon as the shutdown flag is set, no new thread starts working. Every thread increments an integer while communicating with the external framework and decrements it when it's finished. The shutdown may only proceed as soon as no thread is communicating anymore - note that since the shutdown flag is set at first no new thread will start anymore.

That way you get the pretty lightweight AtomicInteger (that's implemented as a CAS loop, you can't get much lower overhead) and the volatile memory barrier.

Now I'm still standing to my first comment and say that it's simpler, more efficient and shorter to catch the exceptions, but I liked the problem :)

Voo
  • 29,040
  • 11
  • 82
  • 156
  • thanks a lot for taking the time to help with this problem. I like where you're going with your 4:30am (!) answer, but there might be some tightening up needed: - Thread A in 2nd line of shutdown(), activeConnections.get() returns 1. - Thread B executes all of leave(), including the notifyAll() because it's the last task. - Thread A now reaches the sync block and waits forever. – Russ Aug 04 '11 at 07:40
  • A guy who answered my previous question had a good pattern for doing the wait & notifyAll, including doing away with the volatiles and atomic count of workers: http://stackoverflow.com/questions/6921530/java-read-write-lock-requirement-with-lock-and-release-from-different-threads/6921738#6921738 . Letting the exceptions happen may be more efficient, but if I don't end up doing it that way I will signal between the workers and shutdown thread using a Semaphore (see example on the same page as above). – Russ Aug 04 '11 at 07:48
  • @Russ Yeah you're right doh. I think one could solve that also, but then the semaphore solution is really nice - an "unbounded" semaphore with the shutdown acquiring all permits (didn't even know you could acquire more than 1 permit at a time).. great solution! – Voo Aug 04 '11 at 14:04