javarabbitmqrabbitmq-exchangerabbitmqctl

RabbitMQ java client is not acknowledging the messages


Java Client

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.17.0</version>
        </dependency>

Publisher

package com.dsabyte.rabbitmqdemo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();

        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel();) {

            channel.queueDeclare("java_queue", true, false, false, null);
            channel.confirmSelect();
            for (int counter = 1; counter <= 10; counter++) {
                channel.basicPublish("", "java_queue", null, String.valueOf(counter).getBytes());
            }
            System.out.println("Messages sent!");
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

Consumer

package com.dsabyte.rabbitmqdemo;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class SubScribe {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();

        try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel();) {

            channel.basicConsume("java_queue", false, "consumer_tag", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {

                    System.out.println("Msg: " + new String(body) + ", redeliver: " + envelope.isRedeliver());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}

Note: i am using https://spring.io/blog/2021/06/21/spring-tools-4-11-0-released editor to run my programs

I tried to run Publisher and here is the output.

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Messages sent!

Then i started consumer, and here is the output.

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Msg: 1, redeliver: false
Msg: 2, redeliver: false
Msg: 3, redeliver: false
Msg: 4, redeliver: false
Msg: 5, redeliver: false
Msg: 6, redeliver: false
Msg: 7, redeliver: false
Msg: 8, redeliver: false
Msg: 9, redeliver: false
Msg: 10, redeliver: false
com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=Closed due to exception from Consumer (consumer_tag) method handleDelivery for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1), class-id=0, method-id=0)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:617)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:542)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:535)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.lambda$close$0(AutorecoveringChannel.java:74)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.executeAndClean(AutorecoveringChannel.java:102)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:74)
    at com.dsabyte.rabbitmqdemo.SubScribe.main(SubScribe.java:29)
Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=Closed due to exception from Consumer (consumer_tag) method handleDelivery for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1), class-id=0, method-id=0)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:589)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:542)
    at com.rabbitmq.client.impl.StrictExceptionHandler.handleChannelKiller(StrictExceptionHandler.java:72)
    at com.rabbitmq.client.impl.StrictExceptionHandler.handleConsumerException(StrictExceptionHandler.java:61)
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:154)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Executed the consumer again and here is the output.

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Msg: 7, redeliver: true
Msg: 8, redeliver: true
Msg: 9, redeliver: true
Msg: 10, redeliver: true

Executed the consumer again and here is the output. This time there is no output.

From redeliver: true it looks like messages are not being acknowledged but i wonder why this is even happening. Exception doesn’t tell us the exact issue which can i resolve.

This problem is only coming with push based consumer, if i use pull based consumer then there isn’t any issue.

package com.dsabyte.rabbitmqdemo;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.GetResponse;

public class Recv {
    public static void main(String[] args) throws InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();

        DeliverCallback callback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody());
            System.out.println("Received message: " + msg);
        };

        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel();) {
            channel.queueDeclare("java_queue", true, false, false, null);
            System.out.println("Waiting for the message");

            while (true) {
                GetResponse msg = channel.basicGet("java_queue", false);

                if (msg != null) {
                    System.out.println("Msg: " + new String(msg.getBody()) + ", deliveryTag: "
                            + msg.getEnvelope().getDeliveryTag());
                    channel.basicAck(msg.getEnvelope().getDeliveryTag(), false);
                } else {
                    break;
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

Thanks in advance


Solution

  • You are registering the asynchronous consumer in a try-with-resources block, so the channel and the connection are closed immediately after the call returns. This code is racy by nature, so it is not possible to draw reliable conclusions from it. If this is just test code, remove the try-with-resources statement or block just after the consumer registration, but before the channel is closed.

    The example with basic.get works because it is in an infinite loop inside the try-with-resources block.