my vertx (4.0.2) application written in Java (11) uses some data-heavy verticles that cause latency spikes because the eventloop gets blocked by them for a moment. For this reason i wanted to deploy these verticles as worker verticles, so that the eventloop and other verticles are no longer blocked.
Unfortunately my application crashes now, because the event handling inside the verticle is executed by multiple threads concurrently ;(
If i understand the vertx documentation correctly, this should not happen:
Worker verticle instances are never executed concurrently by Vert.x by more than one thread, but can executed by different threads at different times.
I was able to reproduce the issue with a minimal example:
@Slf4j
public class WorkerTest extends AbstractVerticle {
private static final String ADDRESS = "address";
private volatile String currentThread = null;
private long counter = 0;
@Override
public void start(final Promise<Void> startPromise) {
vertx.eventBus().consumer(ADDRESS, this::handleMessage);
startPromise.complete();
}
private void handleMessage(Message<Object> message) {
final var _currentThread = this.currentThread;
final var thisThread = Thread.currentThread().getName();
if (_currentThread != null) {
log.error(
"concurrent callback: current thread={}, this thread={}", _currentThread, thisThread);
return;
}
try {
this.currentThread = thisThread;
Thread.sleep(2);
if (++counter % 100L == 0) {
log.info("received {} messages (current thread: {})", counter, thisThread);
}
} catch (Exception e) {
} finally {
this.currentThread = null;
}
}
public static void main(String[] args) {
final Vertx vertx = Vertx.vertx();
vertx.deployVerticle(
new WorkerTest(),
new DeploymentOptions().setWorker(true),
result -> {
if (result.failed()) {
System.exit(1);
return;
}
for (int i = 0; i < 1000; ++i) {
vertx.eventBus().send(ADDRESS, "test");
}
});
}
}
Executing this gives me many log errors because handleMessage
is called from multiple threads concurrently. If i deploy the verticle as non-worker, this works as intended.
What am i doing wrong here?