2

What I do:

I am using vertx rx http client to perform a large number of HTTP requests. in this specific case I am calling "method A" which returns a list of IDs. to receive all the IDs I need to call method A several times to get the next batch of results. ( each time I specify a different page number I want to receive)

In order to improve performance and make the calls in parallel as much as possible I create a list of (RxJava) Observables items each represent the result of a single page request. when I am done creating this list I call Obserable.zip operator and I pass the list of observable.

The Issue:

Using the vertx http client without special settings everything works but rather very slowly. e.g. 3000 http requests are processed in 5 minutes.

I tried to improve the performance by setting the vertx http client options as follows:

 HttpClientOptions options = new HttpClientOptions();

 options.setMaxPoolSize(50)
        .setKeepAlive(true)
        .setPipelining(true)
        .setTcpKeepAlive(true)
        .setPipeliningLimit(25)
        .setMaxWaitQueueSize(10000);

but when I do that I get unstable results: sometimes everything works fine and I am able to receive all responses in less than 20 seconds. however, sometimes the external server I all calling closes the connection and the log shows the following error:

io.vertx.core.http.impl.HttpClientRequestImpl
SEVERE: io.vertx.core.VertxException: Connection was closed
  • No error handler in my code is called
  • When this error appears the zip operator hangs

Here is the code which creates the HttpClientRequest

public Observable<HttpRestResponse> postWithResponse(String url, Map<String, String> headers, String body) {
        Observable<HttpRestResponse> bufferObservable = Observable.create(subscriber -> {
            try {
                HttpClientRequest request = httpClient.postAbs(url);
                addHeadersToRequest(headers, request);
                sendRequest(url, subscriber, request, body);
            }catch (Exception e) {
                try {
                    subscriber.onError(e);
                }catch (Exception ex) {
                    logger.error("error calling onError for subscriber",ex);
                }finally {
                    subscriber.onCompleted();
                }
            }
        });
        return bufferObservable;
    }

private void sendRequest(String requestUrl, Subscriber<? super HttpRestResponse> subscriber, HttpClientRequest request, String bodyData) {
        final long requestId = reqNumber.getAndIncrement();

        if (bodyData != null) {
            request.putHeader("Content-Length", String.valueOf(bodyData.getBytes().length);
        }

        request.putHeader("Accept-Encoding", "gzip,deflate");

        Observable<HttpRestResponse> retVal = request.toObservable()
                .doOnError(throwable -> {
                    logger.error("<<< #: " + requestId + " HTTP call failed. requestUrl [" + requestUrl + "] reason:" + throwable.getMessage());
                }).doOnNext(response -> {
                    if (response != null) {
                        logger.debug(" <<< #: " + requestId + " " + response.statusCode() + " " + response.statusMessage() + " " + requestUrl);
                    }
                }).flatMap(httpClientResponse -> {
                    try {
                        if (httpClientResponse != null && doCheckResponse(httpClientResponse, requestUrl, requestId, bodyData)) {
                            Observable<Buffer> bufferObservable = httpClientResponse.toObservable()
                                    .reduce(Buffer.buffer(1000), (result, buffer) -> result.appendBuffer(buffer));

                            return bufferObservable.flatMap(buffer -> Observable.just(new HttpRestResponse(buffer, httpClientResponse)));
                        }
                    } catch (Exception e) {
                        logger.error("error in RestHttpClient", e);
                    }

                    return Observable.just(new HttpRestResponse(null, httpClientResponse));
                });

        retVal.subscribe(subscriber);

        if (bodyData != null) {
            request.end(bodyData); // write post data
        } else {
            request.end();
        }
    }

asdasdasd

2 Answers2

0

If you think that you can hook up your exception logic like this

 try {
            HttpClientRequest request = httpClient.postAbs(url);
            addHeadersToRequest(headers, request);
            sendRequest(url, subscriber, request, body);
        }catch (Exception e) {
            try {
                subscriber.onError(e);
            }catch (Exception ex) {
                logger.error("error calling onError for subscriber",ex);
            }finally {
                subscriber.onCompleted();
            }
        }

You won't receive any error because essentially all your processing is now dwelling in Rx ecosystem and hence nothing would be reported to you here in your try catch blocks.

The errors from this point of time would be coming to you from your

bufferObservable.onErrorReturn()

or

bufferObservable.subscribe(success, error)
y ramesh rao
  • 2,954
  • 4
  • 25
  • 39
0

So finally I figured this out. it seems that I passed the Observable.zip method an empty list...

the problem here is that onNext or onError are not getting called on the returned observable object of the "zip" method. in such a case only onComplete is called which I did not bother to create a handler for...

Many Thanks for everyone who was interested and tried to help.

Yaniv