2

I have a processor that reads messages from Apache Kafka and sends the data to a REST Endpoint.

The server only has 4 cores and 4 GB ram, out of which a max of 2GB is allocated to the java process

Messages are produced and consumed at the rate of 4k/second.

After running couple of minutes, the program goes out of memory.

  • What is the best way to call http rest end-points asynchronously and not wait for response
  • How to manage the httpClient connection? I was under the impression that I need to start the client an never close it so I can reuse the connection
  • Do you see any issues with the below code

public class SomeProcesor implements BProcessor {

private ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
private CompletionService<Boolean> pool = new ExecutorCompletionService<Boolean>(exec);
CloseableHttpAsyncClient httpclient = null ; 

@Override
public void begin() {
    httpclient = HttpAsyncClients.createDefault();
    RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(5000).setConnectTimeout(5000).setSocketTimeout(5000).build();
    HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig).build();
    // Start the client
    httpclient.start();

}

@Override
public void process(MessageAndMetadata<?, ?> mMData, List<Map> events) {

    List<Map<String, Object>> listMap = new ArrayList<>();  

    // loop and extract the data from events into the above List
    //..
    //..

    // submit to seperate thread to post to HTTP
    pool.submit(new HttpThread(listMap);
}

private class HttpThread implements Callable<Boolean> {
    List<Map<String, Object>> listMap = null;
    public HttpThread(List<Map<String, Object>> listMap) {
        this.listMap = listMap;
    }
    @Override
    public Boolean call() throws Exception {
        return postToHttp(listMap);
    }
}

private Boolean postToHttp(List<Map<String, Object>> listMap) throws UnsupportedEncodingException {
    for (Map<String, Object> map : listMap) {

        try {
            HttpPost postRequest = new HttpPost("https://myserver:8080/services/collector");
            postRequest.addHeader(HttpHeaders.ACCEPT, "application/json");
            postRequest.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
            postRequest.addHeader(HttpHeaders.CONNECTION, "keep-alive");

            StringEntity input = new StringEntity(methodToConvertMapToJSON(map));
            input.setContentType("application/json");
            postRequest.setEntity(input);

            httpclient.execute(postRequest, null);

        } catch (Exception e) {
            return false;
        } catch (Throwable th) {
            return false;
        }
    }
    return true;
}

}

jagamot
  • 5,348
  • 18
  • 59
  • 96
  • I would suggest enabling [`verbose:gc`](http://stackoverflow.com/questions/11889831/java-verbosegc-how-to-read-the-output) (and perhaps profiling) to determine *why* you're exhausting memory. I might also move the `HttpClient` to an [Object Pool](https://en.wikipedia.org/wiki/Object_pool_pattern). – Elliott Frisch Jun 04 '16 at 14:31
  • The processor class is instantiated only once, that is why I was initiating the httpClient object inside the begin. Why do we need to move it to Object Pool? – jagamot Jun 04 '16 at 14:32
  • And you only have **one** processor? For 4,000 messages per second? – Elliott Frisch Jun 04 '16 at 14:35
  • Ah..good point. I have a facility to increase the number of instances of the consumers. So that is why you want the single HttpClient in a Pool? – jagamot Jun 04 '16 at 14:39
  • Do I ever need to close the connection? And for pooling are you talking about something like - https://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html - section 2.3.3? – jagamot Jun 04 '16 at 14:43

1 Answers1

3

need consume the http response or release connection, otherwise the connection will consume resources. change

httpclient.execute(postRequest, null);

to

HttpResponse response = httpclient.execute(postRequest, null).get();
if(response.getStatusLine().getStatusCode() != 200) {
// do something
}
// release the connection, better add to a finally clause
postRequest.releaseConnection();
andy
  • 1,336
  • 9
  • 23