What I do:
I am using vertx rx http client to perform a large number of HTTP requests. in this specific case I am calling "method A" which returns a list of IDs. to receive all the IDs I need to call method A several times to get the next batch of results. ( each time I specify a different page number I want to receive)
In order to improve performance and make the calls in parallel as much as possible I create a list of (RxJava) Observables items each represent the result of a single page request. when I am done creating this list I call Obserable.zip operator and I pass the list of observable.
The Issue:
Using the vertx http client without special settings everything works but rather very slowly. e.g. 3000 http requests are processed in 5 minutes.
I tried to improve the performance by setting the vertx http client options as follows:
HttpClientOptions options = new HttpClientOptions();
options.setMaxPoolSize(50)
.setKeepAlive(true)
.setPipelining(true)
.setTcpKeepAlive(true)
.setPipeliningLimit(25)
.setMaxWaitQueueSize(10000);
but when I do that I get unstable results: sometimes everything works fine and I am able to receive all responses in less than 20 seconds. however, sometimes the external server I all calling closes the connection and the log shows the following error:
io.vertx.core.http.impl.HttpClientRequestImpl
SEVERE: io.vertx.core.VertxException: Connection was closed
- No error handler in my code is called
- When this error appears the zip operator hangs
Here is the code which creates the HttpClientRequest
public Observable<HttpRestResponse> postWithResponse(String url, Map<String, String> headers, String body) {
Observable<HttpRestResponse> bufferObservable = Observable.create(subscriber -> {
try {
HttpClientRequest request = httpClient.postAbs(url);
addHeadersToRequest(headers, request);
sendRequest(url, subscriber, request, body);
}catch (Exception e) {
try {
subscriber.onError(e);
}catch (Exception ex) {
logger.error("error calling onError for subscriber",ex);
}finally {
subscriber.onCompleted();
}
}
});
return bufferObservable;
}
private void sendRequest(String requestUrl, Subscriber<? super HttpRestResponse> subscriber, HttpClientRequest request, String bodyData) {
final long requestId = reqNumber.getAndIncrement();
if (bodyData != null) {
request.putHeader("Content-Length", String.valueOf(bodyData.getBytes().length);
}
request.putHeader("Accept-Encoding", "gzip,deflate");
Observable<HttpRestResponse> retVal = request.toObservable()
.doOnError(throwable -> {
logger.error("<<< #: " + requestId + " HTTP call failed. requestUrl [" + requestUrl + "] reason:" + throwable.getMessage());
}).doOnNext(response -> {
if (response != null) {
logger.debug(" <<< #: " + requestId + " " + response.statusCode() + " " + response.statusMessage() + " " + requestUrl);
}
}).flatMap(httpClientResponse -> {
try {
if (httpClientResponse != null && doCheckResponse(httpClientResponse, requestUrl, requestId, bodyData)) {
Observable<Buffer> bufferObservable = httpClientResponse.toObservable()
.reduce(Buffer.buffer(1000), (result, buffer) -> result.appendBuffer(buffer));
return bufferObservable.flatMap(buffer -> Observable.just(new HttpRestResponse(buffer, httpClientResponse)));
}
} catch (Exception e) {
logger.error("error in RestHttpClient", e);
}
return Observable.just(new HttpRestResponse(null, httpClientResponse));
});
retVal.subscribe(subscriber);
if (bodyData != null) {
request.end(bodyData); // write post data
} else {
request.end();
}
}
asdasdasd