0

I have a Java application that uses the Vertx framework and Kafka.

There is a Kafka consumer with a handler containing my custom code and a Vertx router that simply returns an "OK" to the requests.

While consuming messages from the topic, it happens that the router does not return "OK" immediately, but the request is pending for some seconds (even minutes) before returning the response. This happens specifically when there are a lot of messages on the topic (e.g., 30K messages). I highlight that every single execution of the consumer lasts less than one second.

How can I consume the messages without blocking the Vertx route so that I can immediately receive the response from the route while the Kafka consumer is running?

It is important that the executions of the Kafka consumer must go sequentially between them (e.g., the execution of the consumer must start before the previous one is finished). I just want that the consumer execution and the router execution can run in parallel.

Here is the Vertx router:

  private Router createRouter() {
    final Router router = Router.router(vertx);
    router.get("/healthcheck").handler(rc -> rc.response().end("OK"));
    return router;
  }

Here is the Kafka consumer:

  private void createKafkaConsumer() {
    KafkaConfiguration kafkaConfig = new KafkaConfiguration(config());
    consumer = KafkaConsumer.create(vertx, kafkaConfig.kafkaConsumerConfig());
    consumer.exceptionHandler(event -> log.error(event.getMessage()));

    consumer.handler(record -> {
        try {

           // Here is my consumer function that interacts with a database

        } catch (Exception e) {
          log.warn("Something is wrong: ",e);
        }
        consumer.commit();
    });

EDIT:

Consumer Class:

public class ConsumerVerticle extends CustomAbstractVerticle {
  @Override
  public void start(Promise<Void> promise) {
    createKafkaConsumer(); // function above
  }
}

Web Service class:

public class WebServiceVerticle extends CustomAbstractVerticle {
    @Override
    public void start(Promise<Void> promise) {
      createAnHttpServer(vertx, createRouter(), config(), promise); // createRouter() is the function above
    }
}

Main Class:

public class MainVerticle extends CustomVerticle {

  private Future<Void> deployAllVerticles(Vertx vertx, JsonObject config, Class[] classes) {
    final Promise<Void> promise = Promise.promise();
    List<Future> futureList = new ArrayList<>();
    for (Class verticleClass : classes) {
      futureList.add(deployVerticle(vertx, verticleClass));
    }
  }

  private Future<String> deployVerticle(Vertx vertx, Class clazz) {
    return Future.future(promise -> vertx.deployVerticle(clazz.getName(), promise));
  }

  @Override
  public void start() {

    vertx = Vertx.vertx(new VertxOptions()
           .setMaxEventLoopExecuteTime(TimeUnit.SECONDS.toNanos(5))
    deployAllVerticles(vertx, new Class[] {ConsumerVerticle.class, WebServiceVerticle.class});
  }
}
Dale
  • 1,613
  • 4
  • 22
  • 42
  • paste the full code - how are you deploying the verticle and setting up vertx etc. Very likely both the server and kafka are running on the same single event loop and since it is busy doing kafka stuff it cant respond to the http request Put the http server on a different verticle/event loop and you should be good – Asad Awadia Dec 31 '22 at 03:37
  • @AsadAwadia you are right. I have a single Verticle object containing both the Kafka consumer and the Web Service. Should I put the Web Server in another Verticle and set it as a worker (by calling options.setWorker(true); before the deployVerticle method) ? – Raffaele Martone Jan 03 '23 at 10:11
  • Yes, deploy it in a different verticle. Do not do anything around worker - keep it as a standard verticle – Asad Awadia Jan 03 '23 at 19:41
  • You can check out the course https://www.udemy.com/course/backend-development-with-vertx/learn/?referralCode=063C2D57CCB957C5088C to learn more about vertx – Asad Awadia Jan 03 '23 at 20:02
  • Hi, I tried to deploy the web service in a different verticle but the web service's response has a significant delay when the Kafka consumer is working with many messages. I updated the question with the code. – Raffaele Martone Jan 12 '23 at 14:35
  • Print the thread name in the start method of the http and kafka verticles and send them here – Asad Awadia Jan 12 '23 at 15:46
  • What is this custom abstract verticle? – Asad Awadia Jan 12 '23 at 15:47
  • Thank you. I found out that the threads were the same both for the consumer and the web service. I modified the code and now it's working fine, however I didn't understand the reason of this behavior. I posted another question to explain the two different implementations of the code: https://stackoverflow.com/questions/75110331/why-the-deployed-verticles-are-running-on-the-same-thread-of-the-main-verticle – Raffaele Martone Jan 13 '23 at 14:14

0 Answers0