2

my vertx (4.0.2) application written in Java (11) uses some data-heavy verticles that cause latency spikes because the eventloop gets blocked by them for a moment. For this reason i wanted to deploy these verticles as worker verticles, so that the eventloop and other verticles are no longer blocked.

Unfortunately my application crashes now, because the event handling inside the verticle is executed by multiple threads concurrently ;(

If i understand the vertx documentation correctly, this should not happen:

Worker verticle instances are never executed concurrently by Vert.x by more than one thread, but can executed by different threads at different times.

I was able to reproduce the issue with a minimal example:

@Slf4j
public class WorkerTest extends AbstractVerticle {
  private static final String ADDRESS = "address";
  private volatile String currentThread = null;
  private long counter = 0;

  @Override
  public void start(final Promise<Void> startPromise) {
    vertx.eventBus().consumer(ADDRESS, this::handleMessage);
    startPromise.complete();
  }

  private void handleMessage(Message<Object> message) {
    final var _currentThread = this.currentThread;
    final var thisThread = Thread.currentThread().getName();

    if (_currentThread != null) {
      log.error(
          "concurrent callback: current thread={}, this thread={}", _currentThread, thisThread);
      return;
    }

    try {
      this.currentThread = thisThread;
      Thread.sleep(2);
      if (++counter % 100L == 0) {
        log.info("received {} messages (current thread: {})", counter, thisThread);
      }
    } catch (Exception e) {
    } finally {
      this.currentThread = null;
    }
  }

  public static void main(String[] args) {
    final Vertx vertx = Vertx.vertx();

    vertx.deployVerticle(
        new WorkerTest(),
        new DeploymentOptions().setWorker(true),
        result -> {
          if (result.failed()) {
            System.exit(1);
            return;
          }

          for (int i = 0; i < 1000; ++i) {
            vertx.eventBus().send(ADDRESS, "test");
          }
        });
  }
}

Executing this gives me many log errors because handleMessage is called from multiple threads concurrently. If i deploy the verticle as non-worker, this works as intended.

What am i doing wrong here?

  • 1
    the only thing you are checking is that different threads are used to run your `handleMessage` i.e. between different runs of `handleMessage(...)` method thread has changed. This is valid and complies with the text your took out of vert.x docs. what remains fix is that from start to the end of `handle(...)` no thread switches happen AND no other thread with run `handleMessage` at same time. – mohamnag Apr 08 '21 at 14:01
  • @mohamnag thx for your answer, but i am not sure i understand you correctly. To my understanding, the error log message should never be logged, because each thread that enters `handle...` will set `this.currentThread` to `null` when leaving. But this is not what i see, meaning there are multiple threads executing `handle...' at the same time. – Johannes Ohlemacher Apr 09 '21 at 05:08
  • sorry, mis-read the first line. however, I just tried your code and I never get the log. are you sure all the code is as what you have here? PS. for best result never instantiate the verticle your self when calling `deployVerticle`, either pass class name or use supplier variant `WorkerTest::new` – mohamnag Apr 09 '21 at 06:03
  • i just pushed a complete repro to github: https://github.com/eXistence/vertx-workertest my co-workers see the same issue ;( Upgrading vertx to 4.0.3 seemed to solve the problem at first, but only because they somehow changed the scheduling and all events are handled by the same thread aynways. In a more complex scenario it still breaks ;( – Johannes Ohlemacher Apr 09 '21 at 06:23
  • 4.0.2 had this bug, but 4.0.3 solved this. not sure about your "complex scenario" but see my answer for what works properly – mohamnag Apr 09 '21 at 06:56
  • yeah, my co-worker just found the issue (https://github.com/eclipse-vertx/vert.x/issues/3798) Maybe my more complex scenario is broken... i will do some more tests. Btw: why should we avoid instantiating the verticles directly? Can you point me to some info about this? – Johannes Ohlemacher Apr 09 '21 at 07:05
  • if you deploy multiple instances, then you need to use a class name or a supplier, you can't use the same instance obviously. (see my example) – mohamnag Apr 09 '21 at 07:07

1 Answers1

1

vertx 4.0.2 seems to be the problem in your case. using vertx 4.0.3 and following code:


public class WorkerTest extends AbstractVerticle {
    private static final String ADDRESS = "address";

    private volatile boolean handleMessageInExecution = false;

    public static void main(String[] args) {
        final Vertx vertx = Vertx.vertx();

        vertx.deployVerticle(
                WorkerTest::new,
                new DeploymentOptions()
                        .setInstances(2)
                        .setWorkerPoolSize(10)
                        .setWorker(true)
                ,
                result -> {
                    for (int i = 0; i < 100; ++i) {
                        vertx.eventBus().send(ADDRESS, "test " + i);
                    }
                });
    }

    @Override
    public void start(final Promise<Void> startPromise) {
        vertx.eventBus().localConsumer(ADDRESS, this::handleMessage);
        startPromise.complete();
    }

    private void handleMessage(Message<String> message) {
        if (handleMessageInExecution) {
            // this should never happen, since each thread that sets this to true, will also set it to
            // false on exit.
            System.out.println(message.body() + " ERROR");
            return;
        }

        handleMessageInExecution = true; // this thread is now executing handleMessage
        System.out.println(message.body() + " START   " + Thread.currentThread());

        try {
            Thread.sleep(1); // block thread for a moment to simulate heavy load
        } catch (Exception e) {
            // ignore interruption
            e.printStackTrace();
        } finally {
            handleMessageInExecution = false; // we are done executing
            System.out.println(message.body() + " END     " + Thread.currentThread());
        }
    }
}

we see this output, which is the expected (each message is handled by one thread and it runs from start to end without concurrency, max 2 messages at same time as we have 2 instances):

test 1 START   Thread[vert.x-worker-thread-2,5,main]
test 0 START   Thread[vert.x-worker-thread-3,5,main]
test 0 END     Thread[vert.x-worker-thread-3,5,main]
test 1 END     Thread[vert.x-worker-thread-2,5,main]
test 2 START   Thread[vert.x-worker-thread-3,5,main]
test 3 START   Thread[vert.x-worker-thread-2,5,main]
test 3 END     Thread[vert.x-worker-thread-2,5,main]
test 2 END     Thread[vert.x-worker-thread-3,5,main]
test 5 START   Thread[vert.x-worker-thread-2,5,main]
test 4 START   Thread[vert.x-worker-thread-3,5,main]
test 4 END     Thread[vert.x-worker-thread-3,5,main]
test 6 START   Thread[vert.x-worker-thread-3,5,main]
test 5 END     Thread[vert.x-worker-thread-2,5,main]
test 7 START   Thread[vert.x-worker-thread-2,5,main]
test 6 END     Thread[vert.x-worker-thread-3,5,main]
test 8 START   Thread[vert.x-worker-thread-3,5,main]
test 7 END     Thread[vert.x-worker-thread-2,5,main]
test 9 START   Thread[vert.x-worker-thread-2,5,main]
test 8 END     Thread[vert.x-worker-thread-3,5,main]
test 10 START   Thread[vert.x-worker-thread-3,5,main]
test 9 END     Thread[vert.x-worker-thread-2,5,main]
test 11 START   Thread[vert.x-worker-thread-2,5,main]
test 10 END     Thread[vert.x-worker-thread-3,5,main]
...
mohamnag
  • 2,709
  • 5
  • 27
  • 40