1

I'm currently in a project where I have to do multiple, concurrent http requests to a rest service which returns a JSON response. This is a batch operation and the number of requests at any time could range from several hunderd to several thousend.

That's why I thought it would be a good idea to have an async http client so I could have concurrent requests, which dramatically could speed up the process. I first tried ning's async-http-client. Maybe I was doing something wrong, because it was kind of slow for me. About 10 seconds for 1000 requests.

After which I tried Apache's implementation which was much faster at about 4 seconds for 1000 requests. But I can't seem to get the requests to get stable. Most of the time I will get a List with a 1000 responses (like I expect), but sometimes I am just missing a few responses, like 1 or 2.

This is currently my code:

public class AsyncServiceTest {
    public AsyncServiceTest(String serviceURI) {
        this.httpClient = HttpAsyncClients.custom().setMaxConnPerRoute(100).setMaxConnTotal(20)
                .setDefaultRequestConfig(RequestConfig.custom().build()).build();

        this.objectMapper = new ObjectMapper();
        this.serviceURI = serviceURI;
    }

    private List<Object> getResults(List<String> queryStrings) throws Exception {
        try {
            httpClient.start();

            final List<HttpGet> requests = new ArrayList<>(addresses.size());
            for (String str : queryStrings) {
                requests.add(new HttpGet(buildUri(str))); // In this method we build the absolute request uri.
            }

            final CountDownLatch latch = new CountDownLatch(requests.size());
            final List<Object> responses = new ArrayList<>(requests.size());
            final List<String> stringResponses = new ArrayList<>(requests.size());

            for (final HttpGet request : requests) {
                httpClient.execute(request, new FutureCallback<HttpResponse>() {
                    @Override
                    public void completed(HttpResponse response) {
                        try {
                            stringResponses.add(IOUtils.toString(response.getEntity().getContent(), "UTF-8"));
                            latch.countDown();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void failed(Exception e) {
                        latch.countDown();
                    }

                    @Override
                    public void cancelled() {
                        latch.countDown();
                    }
                });
            }

            latch.await();

            for (String r : stringResponses) {
                responses.add(mapToLocation(r)); // Mapping some Strings to JSON in this method.
            }

            return responses;
        } finally {
            httpClient.close();
        }
    }
}

So, in essence, I am wondering if there is something wrong with my code (probably) or is it just because of the way the library works? Because the CountDownLatch is at zero all the time. Or does anyone have a pointer in the right direction (maybe with another library)?

Alex van den Hoogen
  • 744
  • 1
  • 5
  • 22
  • so you say that the countdown is at zero, but you still missing requests? – JohnnyAW Jan 29 '15 at 15:53
  • I would suggest you log any failures and cancellations. It may give you an insight – Germann Arlington Jan 29 '15 at 15:53
  • 4
    I think the problem is that `stringResponses` is not thread-safe for the add-operation. Replace it with a [ConcurrentLinkedQueue](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html) or use `Collections.synchronizedList` (see also [this question](http://stackoverflow.com/q/6916385/3080094)). – vanOekel Jan 29 '15 at 18:03
  • 1
    @vanOekel, you are right. I replaced they ArrayList with a Vector which already works great and isn't very slow on add-operations, which the CopyOnWriteArrayList is. I will also try the ConcurrentLinkedQueue though. Thanks for the tip! – Alex van den Hoogen Jan 29 '15 at 20:17
  • @AlexvandenHoogen Ah, nice, I did not know Vector was synchronized (and implements List). I suggest you post that as an answer to your own question, it sounds to me as the most practical solution. – vanOekel Jan 29 '15 at 21:32
  • @vanOekel I'm going to first try out different Collection implementations for posting it as an answer. Just to see which one least suffers from performance degredation. – Alex van den Hoogen Jan 30 '15 at 09:14

1 Answers1

2

It seemed to be a concurrency problem (thanks to @vanOekel) in my code. The answer is to replace the ArrayList<E> with a Vector<E>, which is in fact thread-safe. Example code:

public class AsyncServiceTest {
    public AsyncServiceTest(String serviceURI) {
        this.httpClient = HttpAsyncClients.custom().setMaxConnPerRoute(100).setMaxConnTotal(20)
                .setDefaultRequestConfig(RequestConfig.custom().build()).build();

        this.objectMapper = new ObjectMapper();
        this.serviceURI = serviceURI;
    }

    private List<Object> getResults(List<String> queryStrings) throws Exception {
        try {
            httpClient.start();

            final CountDownLatch latch = new CountDownLatch(queryStrings.size());
            final Vector<Object> responses = new Vector<>(queryStrings.size());

            for (String str : queryStrings) {
                // buildUri: In this method we build the absolute request uri.
                httpClient.execute(new HttpGet(buildUri(str)), new FutureCallback<HttpResponse>() {
                    @Override
                    public void completed(HttpResponse response) {
                        try {
                            // mapToLocation: Mapping some Strings to JSON in this method.
                            responses.add(mapToLocation(IOUtils.toString(response.getEntity().getContent(), "UTF-8")));
                            latch.countDown();
                        } catch (IOException e) {
                            failed(e);
                        }
                    }

                    @Override
                    public void failed(Exception e) {
                        logger.error(e.getLocalizedMessage(), e);
                        latch.countDown();
                    }

                    @Override
                    public void cancelled() {
                        logger.error("Request cancelled.");
                        latch.countDown();
                    }
                });
            }

            latch.await();
            return responses;
        } finally {
            httpClient.close();
        }
    }
}

Thanks for all the helpful responses. If anyone has any suggestions regarding optimization of the above code, I will be glad to hear so.

Alex van den Hoogen
  • 744
  • 1
  • 5
  • 22