0

I am using the project reactor kafka to subscribe to a topic. My kafka has an automatic retry set up when an exception is thrown, but using the reactive kafka subscribe it's very difficult to throw an exception.

I have a reactive java application using the reactive kafka stream. The application is subscribing to a topic. My application requires performing some operations like making an external API call based on this event received from Kafka. But if those operations fail I would like to throw an exception back to kafka. I have my kafka middleware configured to retry when an exception was thrown by the application.

Project Reactor: https://projectreactor.io/docs/kafka/release/reference/

Map<String, Object> consumerProps;
consumerProps = new HashMap<>();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "TESTING");
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
receiverOptions =
        ReceiverOptions.<Integer, String>create(consumerProps)
            .subscription(Collections.singleton(topic));

Flux<ReceiverRecord<Integer, String>> inboundFlux =
        KafkaReceiver.create(receiverOptions)
            .receive();

inboundFlux
   .map(b -> processRecordThrowAnException(b))
   .subscribe(r -> {
      System.out.printf("Received message: %s\n", r);
      r.receiverOffset().acknowledge();
    });

private Mono<Void> processRecordThrowAnException(String payload) {
      return Mono.error(new Exception("processing message failed for order: " + traceId));
  }

How do I throw the exception back to upstream?

  • Good to refer: https://stackoverflow.com/questions/51299528/handling-exceptions-in-kafka-streams – Mebin Joe Sep 06 '19 at 04:46
  • My question relates to more on the kafka consumer trying to throw the exception upstream but flux subscribe either lacks or has a very unnatural way to throw the exception that I am still figuring it out. – Miraj Shah Sep 06 '19 at 16:51

0 Answers0