I am using the project reactor kafka to subscribe to a topic. My kafka has an automatic retry set up when an exception is thrown, but using the reactive kafka subscribe it's very difficult to throw an exception.
I have a reactive java application using the reactive kafka stream. The application is subscribing to a topic. My application requires performing some operations like making an external API call based on this event received from Kafka. But if those operations fail I would like to throw an exception back to kafka. I have my kafka middleware configured to retry when an exception was thrown by the application.
Project Reactor: https://projectreactor.io/docs/kafka/release/reference/
Map<String, Object> consumerProps;
consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "TESTING");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
receiverOptions =
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton(topic));
Flux<ReceiverRecord<Integer, String>> inboundFlux =
KafkaReceiver.create(receiverOptions)
.receive();
inboundFlux
.map(b -> processRecordThrowAnException(b))
.subscribe(r -> {
System.out.printf("Received message: %s\n", r);
r.receiverOffset().acknowledge();
});
private Mono<Void> processRecordThrowAnException(String payload) {
return Mono.error(new Exception("processing message failed for order: " + traceId));
}
How do I throw the exception back to upstream?