1

Spring Application uses Reactor Kafka to consume messages.

Question 1: Is there standard convention to pause message consumption and finish processing inflight messages during application shutdown?

Current implementation is to use reactiveKafkaConsumerTemplate to receive messages. Then using @Predestroy, we pause consumer using reactiveKafkaConsumerTemplate.pause.

Current Implementation (simplified)

reactiveKafkaConsumerTemplate
  .receiveAutoAck()
  .publishOn(Schedulers.boundedElastic())
  .flatMap(x -> Mono.just(x)
      .delayElement(Duration.ofMillis(300)),5)
  .flatMap(message -> Mono.just(message)
     .flatMap(processMessageImp::processMessage)
     .onErrorResume(t -> Mono.empty())
  );

public void pauseKafkaMessageConsumer() {
  reactiveKafkaConsumerTemplate
    .assignment()
    .flatMap(topicParts -> reactiveKafkaConsumerTemplate.pause(topicParts))
    .subscribe();
}

@PreDestroy
public void onExit() {
  pauseKafkaMessageConsumer();
  try {
    Thread.sleep(5000);
  } catch (InterruptedException e) {
    log.error("onExit Error while PreDestroy ");
  }
}

Question 2: How to prevent WebClientRequestException when using @Predestroy

However during shutdown after the Thread.sleep in @Predestroy, any inflight messages fail due to WebClientRequestException . Messages processed during the Thread.sleep process successfully.

WebClientRequestException: executor not accepting a task; nested exception is java.lang.IllegalStateException: executor not accepting a task

Webclient implementation uses Springs injected builder

private final WebClient.Builder builder;

public WebClient createWebclient() {
  ...
  return builder.build()
}

WebClientRequestException seems to only happen when using Spring's injected Webclient.Builder.

Jaron Lee
  • 11
  • 1

1 Answers1

0

I took a similar approach that seems to work locally during testing. It pulls from and sends messages to the same topic as part of processing to test that it successfully consumes all of them before shutting down.

Now to be fair, I am not sure if this is a great approach. I'm still kinda new to Kafka, and could use some feedback!

  @NonNull
  private ReactiveKafkaConsumerTemplate<String,String> kafkaConsumerTemplate;

  @NonNull
  private ReactiveKafkaProducerTemplate<String,String> kafkaProducerTemplate;

  @Value("${spring.kafka.consumer.partitionCount}")
  private Integer topicPartitionsCount;

  private boolean shuttingDown;
  private final Composite disposables = Disposables.composite();

  @EventListener(ApplicationReadyEvent.class)
  public void postConstruct() {
    for (int i = 0; i < topicPartitionsCount; i++) {
      disposables.add(readWrite().subscribe());
    }
  }

  @PreDestroy
  public void preDestroy() {
    shuttingDown = true;

    kafkaConsumerTemplate.assignment()
        .delayElements(Duration.of(20, ChronoUnit.SECONDS))
        .flatMap(o -> kafkaConsumerTemplate.pause(o)).subscribe();
  }

  public Flux<MyMessage> readWrite() {
    return kafkaConsumerTemplate
        .receiveAutoAck()
        .takeUntil(item -> shuttingDown)
        .flatMap(item -> ingest(item.value()))
        .flatMap(item -> kafkaProducerTemplate.send("example_topic", GsonFactory.getGson().toJson(item)).thenReturn(item))
        .onErrorContinue((exception, errorConsumer) -> log.error("Exception while processing", exception));
  }
Jeff
  • 45
  • 2
  • 8