I have an endpoint which pushes data to kafka. Now, I want to respond to the call with the appropiriate status code 2xx or 5xx in case of kafka write success or failure respectively. The code snippet is
@Path("/prices")
public class PriceResource {
@Inject @Channel("price-create") Emitter<Double> priceEmitter;
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void addPrice(Double price) {
priceEmitter.send(Message.of(price)
.withAck(() -> {
// Called when the message is acked
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// Called when the message is nacked
return CompletableFuture.completedFuture(null);
}));
// return appropriate response
return Response
}
}
Now the issue is that the endpoint is responding with the status code before ack or nack callback is executed.
Also tried the sendAndAwait
method of MutinyEmitter
but that method returns void. So there is no way to know if the message is acked or nacked.