3

From within a Quarkus application I need to publish tombstone messages to a compacted Apache Kafka topic. As my use-case is imperative I use an Emitter for sending messages to the topic (as suggested in the quarkus blog). The code for non-tombstone messages (with payload) is:

@Dependent
public class Publisher {

  @Inject
  @Channel("theChannelName")
  Emitter<MyDataStructure> emitter;

  public CompletionStage<Void> publish(final MyDataStructure myData) {
    OutgoingKafkaRecordMetadata<String> metadata =
        OutgoingKafkaRecordMetadata.<String>builder()
            .withKey(myData.getTopicKey())
            .build();
    return CompletableFuture.runAsync(
        () -> emitter.send(Message.of(myData).addMetadata(metadata)));
  }
}

The Emitter also implements <M extends Message<? extends T>> void send(M msg) which I hoped would allow me to craft a Message with payload of null as tombstone message. Unfortunately all implementations of the Message.of(..) factory method, that allow to provide metadata (which is needed to provide the message-key), specify that payload, must not be {@code null}.

What is the proper way (following Quarkus / SmallRye Reactive Messaging concepts) to publish tombstone messages to a Kafka topic using an Emitter?

m o
  • 1,807
  • 2
  • 11
  • 12
  • 1
    I'd suggest to use `KafkaRecord.of()`. `KafkaRecord` is a `Message`, and while its `of` methods also document that `value` must not be `null`, that isn't actually enforced. If that works for you, let's just get that documentation updated :-) – Ladicek May 19 '21 at 11:01
  • 1
    @Ladicek While your suggestion might solve the issue right now, using an API in a way it's seemingly not intended to be used is also bad advice. From what you say it seems like a bug which, when fixed, will cause the application to break. – Rea Sand May 19 '21 at 12:09
  • 2
    Another option may be (I'm not entirely sure, but it _should_ work) to use the `Record` class as a message payload. That enables setting both key and value, and both are documented to allow `null` (https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.3/kafka/kafka.html#_sending_keyvalue_pairs). So something like `emitter.send(Record.of(myData.getTopicKey(), null))`. – Ladicek May 19 '21 at 12:29

1 Answers1

3

I would recommend using the Record class (see documentation). A Record is a key/value pair, which represents the key and value of the Kafka record to write. Both can be null, but in your case, only the value part should be null: Record.of(key, null);.

So, you need to change the type of the Emitter to be Record<Key, Value>, such as:

@Dependent
public class Publisher {

  @Inject
  @Channel("theChannelName")
  Emitter<Record<Key, MyDataStructure>> emitter;

  public CompletionStage<Void> publish(final MyDataStructure myData) {
      return emitter.send(Record.of(myData.getTopicKey(), null);
  }
}

While runAsync is convenient, emitters are already async. So, no need to use that. In addition, the behavior can be dramatic in containers (if your level of parallelism is less than 2).

My code returned the result of the send method which is a CompletionStage. That stage will be completed when the record is written in Kafka (and acked by the broker).

Clement
  • 2,817
  • 1
  • 12
  • 11