quarkussmallrye-reactive-messaging

Send amqp message with headers in quarkus


I want to send a Java (well Kotlin) POJO as JSON in an AMQP message to RabbitMQ using Quarkus.

@Path("/amqp")
class TestSource {

    @Inject
    @Channel("amqpwrite")
    lateinit var emitter: Emitter<MonitoringStatusDto>

    @POST
    @Path("/send")
    fun sendMsg() {
        val status = MonitoringStatusDto(status = "OK", message = "test amqp write")
        emitter.send(status)
    }
}

On the rabbit queue the message is received as base64 encoded byte stream.

How can I set the headers here to put the content type in it? Also header settings like TTL might be interesting.


Solution

  • You can add metadata to emitter:

    emitter.send(Message.of(recordToPublish,
            () -> {
              // Called when the message is acked
              return CompletableFuture.completedFuture(null);
            },
            e -> {
              // Called when the message is nacked
              throw new RuntimeException(errorMessage, e);
            }).addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
                .withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
                .build()));
    

    I had the same problem and I found this guide: https://quarkus.io/guides/kafka#sending-messages-to-kafka