3

I need to configure retry policy for calling an API via ExecutorCompletionService.

Sample Code:

public void func() throws Exception{
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
    List<Future<String>> list = new ArrayList<Future<String>>();
    for(int i=0; i<10; i++) {
        AsyncTest asyncTest = new AsyncTest();
        Future<String> futureString = completionService.submit(asyncTest);
        list.add(futureString);
    }
    while (list.size() > 0) {
        Future<String> futureResponse = completionService.take();
        System.out.println(futureResponse.get());
        list.remove(futureResponse);
        }
    executorService.shutdown();
}
public class AsyncTest implements Callable<String> {
       public String call() throws Exception {
              //returns a response from api call
              //this is a network call and throws TimeoutException
       }
}

What's the best way to implement retry policy for TimeoutException thrown while calling the API?

syfro
  • 121
  • 1
  • 5
  • Just a little clarification before I try to answer to be sure I know what exactly is going on. The TimeoutException is coming from the network call lagging, not from a waiting for the executor to finish its business? – Xype Aug 05 '17 at 19:59
  • Yes, TimeoutException is coming from the network call lagging. – syfro Aug 05 '17 at 20:03
  • 1
    Rather than double up on an answer, i think you can get what you need from here https://stackoverflow.com/a/4738630/3630719 – Xype Aug 05 '17 at 20:04

1 Answers1

2

I have enhanced your class AsyncTest:

public class RetryableAsyncTest implements Callable<RetryableAsyncTest> {

   private final String  _name;
   private /* */ String  _value;
   private /* */ boolean _timeouted;
   private /* */ int     _retryCount;

   public RetryableAsyncTest( String name ) {
      _name = name;
   }

   @Override
   public RetryableAsyncTest call() throws Exception {
      try {
         ++_retryCount;
         _timeouted = false;
         //-------- Begin of functionnal code
         if( Math.random() > 0.5 ) {      // Simulation of
            throw new TimeoutException(); // timeout condition
         }
         _value = "computation result";
         //-------- End of functionnal code
      }
      catch( final TimeoutException x ) {
         _timeouted = true;
      }
      return this;
   }

   public String getName() {
      return _name;
   }

   public String getValue() {
      return _value;
   }

   public boolean isTimeouted() {
      return _timeouted;
   }

   public int getRetryCount() {
      return _retryCount;
   }
}

RetryableAsyncExecutor class:

public class RetryableAsyncExecutor {

   private final ExecutorService                       _exec;
   private final CompletionService<RetryableAsyncTest> _comp;

   public RetryableAsyncExecutor( int nThreads ) {
      _exec = Executors.newFixedThreadPool( nThreads );
      _comp = new ExecutorCompletionService<>( _exec );
   }

   public void submit( RetryableAsyncTest task ) {
      _comp.submit( task );
   }

   public RetryableAsyncTest get() throws Exception {
      final Future<RetryableAsyncTest> f = _comp.take();
      final RetryableAsyncTest task = f.get();
      if( task.isTimeouted()) {
         _comp.submit( task );
      }
      return task;
   }

   public void shutdown() {
      _exec.shutdown();
   }
}

Test case:

public class Main {

   public static void main( String[] args ) {
      final int COUNT = 8;
      final RetryableAsyncExecutor re = new RetryableAsyncExecutor( 5 );
      try {
         for( int i = 0; i < COUNT; ++i ) {
            re.submit( new RetryableAsyncTest("Async#"+(i+1)));
         }
         int count = 0;
         while( count < COUNT ) {
            final RetryableAsyncTest task = re.get();
            if( task.isTimeouted()) {
               System.err.printf( "%s: retrying (%d)\n",
                  task.getName(), task.getRetryCount());
            }
            else {
               System.err.printf( "%s: done with '%s'.\n",
                  task.getName(), task.getValue());
               ++count;
            }
         }
      }
      catch( final Throwable t ) {
         t.printStackTrace();
      }
      re.shutdown();
      System.exit( 0 );
   }
}

Execution log:

Async#4: done with 'computation result'.
Async#1: done with 'computation result'.
Async#6: retrying (1)
Async#3: done with 'computation result'.
Async#8: done with 'computation result'.
Async#7: retrying (1)
Async#2: done with 'computation result'.
Async#5: retrying (1)
Async#6: done with 'computation result'.
Async#7: done with 'computation result'.
Async#5: retrying (2)
Async#5: done with 'computation result'.

If you want to ceil the number of retry, this logic takes place into RetryableAsyncExecutor.get() method, as an if-then-else condition around _comp.submit( task );

Aubin
  • 14,617
  • 9
  • 61
  • 84