4

How can i handle exception on streaming processing with quarkus + kafka + smallrye?

My code is very similar to the imperative producer example on quarkus guide (https://quarkus.io/guides/kafka#imperative-usage)

import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(price);
    }
}

I wanted something similar the vanilla Kafka library, that gives the option to handle the callback of each record requested to send.

ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", key, value);

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        logger.info(record.toString());

        if (exception != null) {
            logger.error("Producer exception", exception);
        }
    }
});

Tks

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
tcsdev
  • 191
  • 1
  • 2
  • 7
  • 2
    Please raise an issue here: https://github.com/smallrye/smallrye-reactive-messaging Include a reproducer and we can investigate. It may be that we need better error handling exposed to users – Ken Jan 31 '20 at 18:25
  • Tks Ken, i'm proceeding with that!! – tcsdev Jan 31 '20 at 20:19

1 Answers1

2

There is a section of the docs on Acknowlegement

@Incoming("i")
@Outgoing("j")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
  public CompletionStage<Message<String>> manualAck(Message<String> input) {
    return CompletableFuture.supplyAsync(input::getPayload)
      .thenApply(Message::of)
      .thenCompose(m -> input.ack().thenApply(x -> m));
  }
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • 1
    Your solution will work if it is fully-reactive(using @incoming & @outgoing). But he is using `Emitter` to produce messages !. How can he handle exception in case of Emitter usage ? – iabughosh Jan 31 '20 at 18:03
  • @iabug Good point, but I'm not sure. I would still refer to the documentation – OneCricketeer Feb 01 '20 at 04:02
  • 2
    I followed @Ken's suggest and got the response. There is a new Emitter comming that provides this feature. The send method returns a CompletionStage so then i will be able to handle the result. https://github.com/smallrye/smallrye-reactive-messaging/issues/400 – tcsdev Feb 03 '20 at 14:01