rabbitmqquarkusrabbitmq-stream

How to use Quarkus to read from Rabbitmq stream


How can I read the data from RabbitMQ streams using Quarkus. Is it possible to use SmallRye to achieve this or not? Or do I have to use rabbit client directly?

I would like to

  1. read data from rabbit stream (not queue)
  2. replay data from rabbit stream from a certain point in time

I have not found any reference about this using SmallRye wrapper

I have this config

# incoming
mp.messaging.incoming.requestss.connector=smallrye-rabbitmq
mp.messaging.incoming.requestss.exchange.name=quote-requests
mp.messaging.incoming.requestss.queue.name=quote-stream
mp.messaging.incoming.requestss.queue.x-queue-type=stream

but I am getting the following error

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'quote-stream' in vhost '/'', class-id=60, method-id=20)
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
        at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1378)
        ... 15 more

Solution

  • RabbitMQ Streams model is an append-only log of messages. Every consumer starts reading a specified offset of the log. Because of this QoS (a.k.a. prefetch count) must be set as an incoming channel. In SmallRye RabbitMQ Connector the max-outstanding-messages config property controls the QoS value

    mp.messaging.incoming.requestss.connector=smallrye-rabbitmq
    mp.messaging.incoming.requestss.excahnge.name=quote-requests
    mp.messaging.incoming.requestss.queue.name=quote-stream
    mp.messaging.incoming.requestss.queue.x-queue-type=stream
    
    # sets prefetch count / QoS to 100
    mp.messaging.incoming.requestss.max-outstanding-messages=100
    

    NOTE: I haven't find possibilities to configure RabbitMQ Streams specific arguments (e.g. x-stream-offset, x-max-age, x-stream-max-segment-size-bytes, etc), so I assume each client will use the default values of those arguments. It would be great if SmallRye made that.