rabbitmqpriority-queuespring-cloud-streamspring-cloud-stream-binder

How to send a message with priority to RabbitMQ with StreamBridge


I'm using RabbitMQ. I've defined a queue with priority, and I can send messages to this queue with some priority value using RMQ GUI, and consumers also get the messages in sorted order, but when I try to send the message from my java code using Stream bridge, I don't know how to specify the priority with the message. Here's what I have tried :

  1. I have added x-max-priority: 10 to the queue while creating the queue.

  2. Consumer example =

     @Bean
     public Consumer<Message<String>> testListener() {
     return (m) -> {
         System.out.println("inside consumer with message : " + m);
         System.out.println("headers : " + m.getHeaders());
         System.out.println("payload : " + m.getPayload());
     };
     }
    
  3. Producer example =

     @GET
     @Path("test/")
     public void test(@Context HttpServletRequest request) {
     System.out.println("inside test");
     try {
         String payload = "hello world";
         logger.info("going to send a message : {}", payload);
         int priority = 5;
         Message<String> message = MessageBuilder.withPayload(payload)
                         .setHeader("priority", priority)
                         .build();
         boolean res = STREAM_BRIDGE.send("testWriter-out-0", message);
         System.out.println(message);
         System.out.println(res);
     } catch (Exception e) {
         logger.error(e);
     }
    }
    

The output of the Producer =

    -> inside test
    -> GenericMessage [payload=hello world, headers={priority=5, id=some_id, timestamp=epoch}]
    -> true

The output of the Consumer =

    -> inside consumer with message : GenericMessage [payload=hello world, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=test_exchange, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=test_exchange.ats, amqp_redelivered=false, amqp_receivedRoutingKey=test_exchange, amqp_timestamp=date_time, amqp_messageId=some_id, id=some_id, amqp_consumerTag=some_tag, sourceData=(Body:'hello world' MessageProperties [headers={}, timestamp=date_time, messageId=some_id, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test_exchange, deliveryTag=1, consumerTag=some_tag, consumerQueue=test_exchange.ats]), contentType=application/json, timestamp=epoch}]
    -> headers : {amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=test_exchange, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=test_exchange.ats, amqp_redelivered=false, amqp_receivedRoutingKey=test_exchange, amqp_timestamp=date_time, amqp_messageId=some_id, id=some_id, amqp_consumerTag=tag, sourceData=(Body:'hello world' MessageProperties [headers={}, timestamp=date_time, messageId=some_id, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test_exchange, deliveryTag=1, consumerTag=tag, consumerQueue=test_exchange.ats]), contentType=application/json, timestamp=epoch}
    -> payload : hello world
    

So the message goes to RMQ and the consumer also gets the message, but on RMQ GUI when I perform Get-message operation on the Queue, I get this result =>

    Message 1
    The server reported 0 messages remaining.
    Exchange    test_exchange
    Routing Key test_exchange
    Redelivered ○
    Properties  
                timestamp:  timestamp
                message_id: some_id
                priority:   0
                delivery_mode:  2
                headers:    
                content_type:   application/json

    Payload     hello world
    11 bytes
    Encoding: string
    

As we can see in the above result, priority is set to 0 by RMQ (and hence in the Consumer, I get the messages in the FIFO manner, not in a priority-based manner) and inside headers : only one header is present "content_type: application/json", so I think the priority is not a part of the header but is a part of properties, then how to set message properties using StreamBridge?

To conclude, I am trying to figure out how to set the priority of a message dynamically while sending it using StreamBridge, any help would be appreciated, thanks in advance !


Solution

  • Please, consider to use the latest Spring Cloud Stream: https://spring.io/projects/spring-cloud-stream#learn.

    Apparently your spring-cloud-starter-stream-rabbit = 3.0.3.RELEASE is old enough to suffer from the issue https://github.com/spring-cloud/spring-cloud-stream/issues/1931.

    Have just tested with the latest one and I got the proper priority property on the message posted into RabbitMQ queue by the mentioned StreamBridge.