rabbitmqamqpqpidqpid-proton

Connection to RabbitMQ Durable Queue fails over AMQP 1.0 protocol - Protonj2


Found Solution:- See solution at the end of this question.

I am trying to connect my AMQP 1.0 consumer (using Apache ProtonJ2 library). But my connection fails with following error

org.apache.qpid.protonj2.client.exceptions.ClientSessionRemotelyClosedException: PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue_durable' in vhost '/': received 'false' but current is 'true' [condition = amqp:precondition-failed]

Following is my sample code.

public void connectAmqp() throws Throwable {
    final String serverHost = "localhost";
    final int serverPort = 5672;
    final String address = "test_queue_durable";
    final Client client = Client.create();

    final ConnectionOptions options = new ConnectionOptions().user("admin").password("admin");

    try{
        Connection connection = client.connect(serverHost, serverPort, options);
        
        Receiver receiver = connection.openReceiver(address);
        
        for (int i = 0; i < 100; ++i) {
            Delivery delivery = receiver.receive();
            System.out.println(delivery.message().body().getClass());
            System.out.println("*-*-*-*          " + new String((byte[])delivery.message().body()));
        }
    }catch (Exception e) {
        e.printStackTrace();
    }
}

Important points to Note:

  1. Queue is pre declared in RabbitMQ
  2. Queue is configured to be durable to prevent message loss
  3. Consumer connects successfully if durable property of queue is removed (but this is not intended)
  4. AMQP 1.0 protocol is required to connect
  5. Client library used to connect is Apache Qpid Proton J2.

Edited Resolution

    public void connectAmqp() throws Throwable {
    final String serverHost = "localhost";
    final int serverPort = 5672;
    final String address = "test_queue_durable";
    final Client client = Client.create();

    try{
        ConnectionOptions options = new ConnectionOptions().user("user").password("pa$$w0rd");
        Connection connection = client.connect(serverHost, serverPort, options);
        /**
         * Consumer Connecting to Durable Queue Connections
         * Set durable property on source link of receiveroptions
         */
        ReceiverOptions ro = new ReceiverOptions();
        ro.sourceOptions().durabilityMode(DurabilityMode.CONFIGURATION);
        
        Receiver receiver = connection.openReceiver(address, ro);
        
        for (int i = 0; i < 100; ++i) {
            Delivery delivery = receiver.receive();
            System.out.println(delivery.message().body().getClass());
            System.out.println("*-*-*-*          " + new String((byte[])delivery.message().body()));
        }
    }catch (Exception e) {
        e.printStackTrace();
    }
}

Solution

  • Likely you need to configure the Receiver Source values to match the Queue that you have created in your broker such that it allows the receiver to attach.

    You'd need to do something like this (with configuration that satisfies the RabbitMQ attach prerequisites):

    ReceiverOptions receiverOptions = new ReceiverOptions();
    receiverOptions.sourceOptions().durabilityMode(DurabilityMode.CONFIGURATION);
    
    Receiver receiver = session.openReceiver(address, receiverOptions);