javarocketmq

Why the rocketmq only consumes part of the queue


After send message to the RocketMQ 4.8, I found the consumer only consumed part of the queue. This is the RocketMQ consumer code looks like:

public void appConsumer(Long appId, List<String> topics) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(appId.toString());
    RocketMQConfigDTO rocketMQConfigDTO = MqConfigHandler.MQConfig();
    consumer.setNamesrvAddr(rocketMQConfigDTO.getHost());
    consumer.setMessageModel(MessageModel.CLUSTERING);
    consumer.setMaxReconsumeTimes(rocketMQConfigDTO.getMaxReconsumeTimes());
    consumer.setConsumeThreadMin(rocketMQConfigDTO.getConsumeThreadMin());
    consumer.setConsumeThreadMax(rocketMQConfigDTO.getConsumeThreadMax());
    consumer.setConsumeMessageBatchMaxSize(rocketMQConfigDTO.getConsumeMessageBatchMaxSize());
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    for (String topic : topics) {
        consumer.subscribe(topic, "*");
    }
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            Callable<ConsumeConcurrentlyStatus> task = new Callable<ConsumeConcurrentlyStatus>() {
                @Override
                public ConsumeConcurrentlyStatus call() throws Exception {
                    return handleMessage(msgs, context,appId);
                }
            };
            asyncService.submit(task);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    eventConsumers.put(appId, consumer);
}

When I access to the RocketMQ dashboard, I found the consumer only consumed part of the queue in the same broker:

enter image description here

The queue 0 and 1's message were not consumed by the consumer. Why did this happen? What should I do to fixed this problem? Why did the consumer only consume messages 3 and 4?


Solution

  • For your case, the most possible cause is that you started two instances with the same consumer group name. Let's say it c1 and c2 with the same consumer group "CG1".

    While c1 subscribes different topics from c2 does. This can be the most possible cause. So please check your code if this case exists.

    Another cause is that c1 and c2 indeed subscribe the same topics , but c1 is a push consumer while c2 is a pull consumer. This likely happens when you are using springboot-rocketmq since it will create a pull consumer by default.

    No matter which cause you are facing with, you can find it out in the console that if there are more than 1 instance in the instance list of the CG1.