I am writing unit test for the following code which uses AsyncHttpClient. The use of CountDownLatch and decrementing of the CountDownLatch within the implementation of FutureCallback is causing my JUnit testcase to hang waiting for countdown latch to be decremented. Within the JUnit test I am capturing the FutureCallback using ArgumentCaptor and then using thenAnswer I am calling the completed method to decrement the count down latch. But it doesn't seems to work, any ideas will be helpful.
public List<QueryResponse> execute(Query query) {
List<QueryResponse> res = new ArrayList<QueryResponse>();
try {
List<HttpRequestBase> requests = query.generateHttpRequests();
List<Future<HttpResponse>> futures = new ArrayList<Future<HttpResponse>>();
final CountDownLatch requestCompletionCDLatch = new CountDownLatch(requests.size());
for (HttpRequestBase request : requests) {
futures.add(httpClient.execute(request, new FutureCallback<HttpResponse>() {
@Override
public void failed(Exception e) {
logger.error("Error while executing: " + request.toString(), e);
requestCompletionCDLatch.countDown();
}
@Override
public void completed(HttpResponse result) {
requestCompletionCDLatch.countDown();
}
@Override
public void cancelled() {
logger.info("Request cancelled while executing: " + request.toString());
requestCompletionCDLatch.countDown();
}
}));
}
requestCompletionCDLatch.await();
for (Future<HttpResponse> future : futures) {
HttpResponse response = future.get(rcaRequestTimeout, TimeUnit.SECONDS);
int status = response.getStatusLine().getStatusCode();
if (status != HttpStatus.SC_OK) {
logger.warn("Query with non-success status " + status);
} else {
res.add(query.parseResponse(response.getEntity().getContent()));
}
}
} catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
logger.error("Error while querying", e);
} catch (URISyntaxException e) {
logger.error("Error while generating the query", e);
}
return res;
}
My unit test is as follows:
@Test
public void testHttpError() throws InterruptedException, ExecutionException, TimeoutException {
StatusLine status = Mockito.mock(StatusLine.class);
when(status.getStatusCode()).thenReturn(400);
HttpResponse response = Mockito.mock(HttpResponse.class);
when(response.getStatusLine()).thenReturn(status);
Future<HttpResponse> future = Mockito.mock(Future.class);
when(future.get(anyLong(), any())).thenReturn(response);
CloseableHttpAsyncClient httpClient = Mockito.mock(CloseableHttpAsyncClient.class);
ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
ArgumentCaptor<FutureCallback<HttpResponse>> futureCallbackCaptor = ArgumentCaptor.forClass((Class)FutureCallback.class);
when(httpClient.execute(any(), any())).thenReturn(future).thenAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
verify(httpClient).execute(requestCaptor.capture(), futureCallbackCaptor.capture());
futureCallbackCaptor.getValue().completed(response);
return null;
}
});
StubbedRcaClient rcaClient = new StubbedRcaClient(httpClient);
Query query = new Query("abc", "xyz", RcaHttpRequestType.GET, 1000);
List<QueryResponse> res = rcaClient.execute(query);
assertEquals(0, res.size());
IOUtils.closeQuietly(rcaClient);
}