Spring Application uses Reactor Kafka to consume messages.
Question 1: Is there standard convention to pause message consumption and finish processing inflight messages during application shutdown?
Current implementation is to use reactiveKafkaConsumerTemplate
to receive messages. Then using @Predestroy
, we pause consumer using reactiveKafkaConsumerTemplate.pause
.
Current Implementation (simplified)
reactiveKafkaConsumerTemplate
.receiveAutoAck()
.publishOn(Schedulers.boundedElastic())
.flatMap(x -> Mono.just(x)
.delayElement(Duration.ofMillis(300)),5)
.flatMap(message -> Mono.just(message)
.flatMap(processMessageImp::processMessage)
.onErrorResume(t -> Mono.empty())
);
public void pauseKafkaMessageConsumer() {
reactiveKafkaConsumerTemplate
.assignment()
.flatMap(topicParts -> reactiveKafkaConsumerTemplate.pause(topicParts))
.subscribe();
}
@PreDestroy
public void onExit() {
pauseKafkaMessageConsumer();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
log.error("onExit Error while PreDestroy ");
}
}
Question 2: How to prevent WebClientRequestException when using @Predestroy
However during shutdown after the Thread.sleep
in @Predestroy
, any inflight messages fail due to WebClientRequestException
. Messages processed during the Thread.sleep
process successfully.
WebClientRequestException: executor not accepting a task; nested exception is java.lang.IllegalStateException: executor not accepting a task
Webclient implementation uses Springs injected builder
private final WebClient.Builder builder;
public WebClient createWebclient() {
...
return builder.build()
}
WebClientRequestException
seems to only happen when using Spring's injected Webclient.Builder.