1

I have a program where I have deployed a worker verticle (vertx version 3.6.3). From this, worker verticle, I am making an HTTP request using Vertx WebClient library. My vertx worker pool size is 20 and event loop pool size is 10. Right after, http request is made (after send() call) I am blocking the worker thread which has made the HTTP request (worker thread) using completable future. When I block the worker thread, HTTP request never responds and always timesout. It responds when worker thread is not blocked. I thought, If I block the worker thread, there are other worker threads from the pool to honour the HTTP requests. What am I doing wrong here ? Also, I have enabled network log activity but I don't see any network logs getting printed in the logs.

Following is program I have tried and I am running a sample HTTP server running at localhost at port 8080 which responds fine.

import java.util.concurrent.TimeUnit;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;

public class VertxTestMain extends AbstractVerticle {

    private static int workerPoolSize = 20;
    private static int eventLoopPoolSize = 10;

    @Override
    public void start() {
        vertx.eventBus().consumer("vertx.address", event -> {
            CompletableFuture<String> future = new CompletableFuture<>();
            doAHttpRequest(vertx, future);
            try {
                //future.get(20, TimeUnit.SECONDS);
            } catch (Exception e) {
                System.out.println(Thread.currentThread().getName()+ " ----- HTTP request never responded");
                e.printStackTrace();
            }
        });
    }

    private void doAHttpRequest(Vertx vertx, CompletableFuture<String> future)  {

        //System.setProperty("java.util.logging.config.file", "/opt/maglev/persisted/data/vertx-default-jul-logging.properties");

        WebClientOptions options = new WebClientOptions();
        options.setLogActivity(true);
        WebClient webClient = WebClient.create(vertx, options );

        int port = 8080;
        String host = "localhost";
        String url = "/";

        System.out.println(Thread.currentThread().getName()+ " ----- Sending http://" + host + ":" + port + "/" + url);

        // Send a GET request
        webClient
        .get(port, host, url)
        .timeout(10000)
        .send(ar -> {
            if (ar.succeeded()) {
                HttpResponse<Buffer> response = ar.result();
                System.out.println(Thread.currentThread().getName()+ " ----- Received response.  " + response.bodyAsString());
                System.out.println(Thread.currentThread().getName()+ " ----- Received response with status code.  " + response.statusCode());
                future.complete("success");
            } else {
                System.out.println(Thread.currentThread().getName()+ " ----- Something went wrong. " + ar.cause().getMessage());
                future.completeExceptionally(ar.cause());
            }
        });
    }

    public static void main(String[] args) {

        DeploymentOptions deploymentOptions = new DeploymentOptions().setWorker(true).setInstances(2);
        VertxOptions vertxOptions = new VertxOptions();
        vertxOptions.setWorkerPoolSize(workerPoolSize);
        vertxOptions.setEventLoopPoolSize(eventLoopPoolSize);
        Vertx vertx = Vertx.vertx(vertxOptions);

        vertx.deployVerticle(VertxTestMain.class.getName(), deploymentOptions, event -> {
            if(event.succeeded()) {
                System.out.println(Thread.currentThread().getName()+ " ----- VertxTestMain verticle is deployed");
                vertx.eventBus().send("vertx.address", "send");
            }
            else {
                System.out.println(Thread.currentThread().getName()+ " ----- VertxTestMain verticle deployment failed. " + event.cause());
            }
        });
    }
}
Raghavendra Nilekani
  • 396
  • 2
  • 10
  • 22

1 Answers1

1

You don't allow your HTTP request to start.

Return Future from your method, instead of passing it:

private CompletableFuture<String> doAHttpRequest(Vertx vertx)  {

    CompletableFuture<String> future = new CompletableFuture<>();
    WebClientOptions options = new WebClientOptions();
    options.setLogActivity(true);
    WebClient webClient = WebClient.create(vertx, options );

    int port = 8080;
    String host = "localhost";
    String url = "/";

    System.out.println(Thread.currentThread().getName()+ " ----- Sending http://" + host + ":" + port + "/" + url);

    // Send a GET request
    webClient
    .get(port, host, url)
    .timeout(10000)
    .send(ar -> {
        if (ar.succeeded()) {
            HttpResponse<Buffer> response = ar.result();
            System.out.println(Thread.currentThread().getName()+ " ----- Received response.  " + response.bodyAsString());
            System.out.println(Thread.currentThread().getName()+ " ----- Received response with status code.  " + response.statusCode());
            future.complete("success");
        } else {
            System.out.println(Thread.currentThread().getName()+ " ----- Something went wrong. " + ar.cause().getMessage());
            future.completeExceptionally(ar.cause());
        }
    });

    return future;
}

You can also reuse the WebClient, there's no need to create it for every request.

Also, take a look into Promise API Vert.x provides, as it may be better suited for your use case:

https://vertx.io/docs/apidocs/io/vertx/core/Promise.html

Alexey Soshin
  • 16,718
  • 2
  • 31
  • 40
  • Does vertx send the request out of the wire as soon as the worker thread completes executing the webclient.get line of code ? If so, why would it block the request ? – Rajesh TV Mar 22 '20 at 18:08
  • `send()` puts your request in the task queue, asynchronously. Now who needs to execute this task? Well, it's the thread that put it there, your worker thread. But it cannot execute anything, because it's blocked on `future.get()`. – Alexey Soshin Mar 22 '20 at 19:36
  • Why should the same worker thread execute the ```send``` task? If there are other worker threads available in the worker pool, can't they execute that task ? – Raghavendra Nilekani Sep 15 '20 at 18:14
  • It's called thread affinity. Otherwise you would need locks, and locks are slow. – Alexey Soshin Sep 15 '20 at 19:27