3

I want to send a Java (well Kotlin) POJO as JSON in an AMQP message to RabbitMQ using Quarkus.

@Path("/amqp")
class TestSource {

    @Inject
    @Channel("amqpwrite")
    lateinit var emitter: Emitter<MonitoringStatusDto>

    @POST
    @Path("/send")
    fun sendMsg() {
        val status = MonitoringStatusDto(status = "OK", message = "test amqp write")
        emitter.send(status)
    }
}

On the rabbit queue the message is received as base64 encoded byte stream.

How can I set the headers here to put the content type in it? Also header settings like TTL might be interesting.

yogiginger
  • 1,075
  • 4
  • 13
  • 25

1 Answers1

0

You can add metadata to emitter:

emitter.send(Message.of(recordToPublish,
        () -> {
          // Called when the message is acked
          return CompletableFuture.completedFuture(null);
        },
        e -> {
          // Called when the message is nacked
          throw new RuntimeException(errorMessage, e);
        }).addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
            .withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
            .build()));

I had the same problem and I found this guide: https://quarkus.io/guides/kafka#sending-messages-to-kafka

luis martinez
  • 62
  • 1
  • 9