0

I'm looking to respond to a REST endpoint with a Success/Failure response that dynamically accepts a topic as a query param. In Quarkus with smallrye reactive messaging the code would look something like below wrapping the payload with OutgoingKafkaRecordMetadata

i.e. https://myendpoint/publishToKafka?topic=myDynamicTopic

@Channel("test")
Emitter<byte []> kafkaEmitter;

@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){

    kafkaEmitter.send(Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
            .withKey("my-key")
            .withTopic("myDynamicTopic")
            .build()));
    
}

From the Quarkus doco "If the endpoint does not return a CompletionStage, the HTTP response may be written before the message is sent to Kafka, and so failures won’t be reported to the user." The example here describes this process when you send a payload directly (i.e. emitter.send(payload) which returns a CompletionStage but emitter.send(message) returns void) but this requires configuring the topic in advance. Is it possible to specify metadata with a Message and still respond to the calling client with a success/failure response? (I don't mind if it's with Emitter and CompletionStage or MunityEmitter and Uni).

Any advice or suggestions would be appreciated.

  • Is it really necessary for the client to know whether the message was successfully delivered the the broker? We normally use messaging systemt for asynchronous communication, hence error handling should not be a concern for the client, but for the backend. – Turing85 Mar 20 '22 at 09:48
  • I can think of a couple of scenarios where this would be useful. If there's a configuration issue with the topic (i.e. security, wrong name, kafka offline) the client would continue unaware of an error as it's received a 200 or 204 from the server and would think it's successfully committing messages. If the state is driven from an external source (i.e a database) the client would like to know the data has been akd successfully before updating its state (I know friends don't let friends do dual writes but that's not always possible i.e ideally something like debezium and the outbox pattern) – thedartfish Mar 20 '22 at 10:24
  • The idea of the [outbox pattern (`https://microservices.io/`)](https://microservices.io/patterns/data/transactional-outbox.html) is to use a second transaction. The important part on the 2nd transaction is that it does not modify the outbox (only read it) and only interact with the outgoing topic. Hence, "from the outside", we cannot (easily) tell whether the 2nd transaction succeeded or not. So how can we "know" when we can send the response to the client? – Turing85 Mar 20 '22 at 11:01
  • If you're using `Emitter.send(Message)`, you can add an ack/nack handler to the message object. In that handler, you can complete the `CompletionStage` you return from the resource method. – Ladicek Mar 20 '22 at 12:25
  • @Ladicek that's true and demonstrated in the example I linked in my question but the HTTP response is returned to the client before the ack is confirmed. I need the confirmation of success/failure of the message publish sent back to the client. – thedartfish Mar 20 '22 at 12:35
  • OK let me be a little more specific. You need to create a `new CompletableFuture()`, then call `emitter.send` with a `Message` that includes an ack/nack handler, and then `return` the `CompletableFuture`. At that point, the future is not yet complete, so response will not be sent. In the ack/nack handler, you complete the future, and that's when the response will be sent. – Ladicek Mar 20 '22 at 13:55
  • Thanks @Ladicek I understand what you mean now! Clement has provided an example code snippet below as well. Cheers. – thedartfish Mar 20 '22 at 22:40
  • @thedartfish, I would like to take a look of the consumer of this topic. Do you have the code in github? do you mind sharing it? Thanks :) – Johnyzhub May 02 '22 at 16:35
  • Hey @Johnyzhub In my specific instance I don't consume the data from the topic using a Quarkus/Smallrye consumer. I hand topic consumption off to [Apache NiFi] (https://nifi.apache.org/) but once the data is on the Kafka topic it should be similar to consumption from any Kafka topic. I've not used the Smallrye/Quarkus consumer components so unfortunatly I don't have any specific code examples I can provide but the Quarkus [doco] (https://quarkus.io/guides/kafka#receiving-messages-from-kafka) contains a few examples for different circumstances that may assist. – thedartfish May 04 '22 at 00:11

1 Answers1

3

Because you use a Message (as you need to specify the topic), you need something a bit more convoluted:

@Channel("test")
Emitter<byte []> kafkaEmitter;

@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){
    CompletableFuture<Void> future = new CompletableFuture<>();
    Message<byte[]> message = Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata. 
           <String>builder()
            .withKey("my-key")
            .withTopic("myDynamicTopic")
            .build()));
    message = message.withAck(() -> {
         future.complete(null));
         return CompleteableFuture.completedFuture(null);
    }
     .withNack(t -> {
       future.completeExceptionnaly(t));
       return CompleteableFuture.completedFuture(null);
    });
    kafkaEmitter.send(message);
    return future;    
}

In this snippet, I also attach the ack and nack handlers called when the message is either acknowledged (accepted by the broker) or rejected (something wrong happened).

These callbacks report to future, a CompletableFuture created in the method. This is the object to return, as it will do what you want: indicate the outcome.

I know the callbacks are slightly complicated. This is mainly due to the spec: We have to return CompleteableFuture.completedFuture(...); to acknowledge that the nack-process was successful. If we were to return future; instead (which we have set to future.completeExceptionnaly(t));), this would be interpreted as a failure during the nack-process. This would basically be the equivalent to a throw within a catch-block in the imperative world.

Fortunately, an easier version will be available soonish (no worries, we won't break).

Clement
  • 2,817
  • 1
  • 12
  • 11
  • Thanks @Clement and Turing85 that's a lot clearer now I sincerely appreciate your input. I did find the callback management with CompletionStage a little confusing so seeing it in code definitely cleared things up for me. I'll keep an eye out for the "easier" version Clement so I'll be sure to tune into the excellent blog's and youtube sessions i've seen from you and the RedHat community. Thanks again. – thedartfish Mar 20 '22 at 22:57