rabbitmqtimeout

RabbitMQ unacked messages not removed from queue after expiration


I have a RabbitMQ server (v.3.8.2) with a simple exchange fanout-queue binding running, with several producers and one consumer. The average delivery/ack rate is quite low, about 6 msg/s.

The queue is created at runtime by producers with the x-message-ttl parameter set at 900000 (15 minutes).

In very specific conditions (e.g. rare error situation), messages are rejected by the consumer. These messages then are shown in the unacked counter on the RabbitMQ admin web page indefinitely. They never expire or get discarded event after they timeout.

There are no specific per-message overrides in ttl parameters.

I do not need any dead letter processing as these particular messages do not require processing high reliabilty, and I can afford to lose some of them every now and then under those specific error conditions.

Exchange parameters:

name: poll
type: fanout
features: durable=true
bound queue: poll
routing key: poll

Queue parameters:

name: poll
features: x-message-ttl=900000 durable=true

For instance, this is what I am currently seeing in the RabbitMQ server queue admin page:

"Poll" Queue summary

As you can see, there are 12 rejected/unack'ed messages in the queue, and they have been living there for more than a week now.

How can I have the nacked messages expire as per the ttl parameter? Am I missing some pieces of configuration?

Here is an extract from the consumer source code:

// this code is executed during setup
...
consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
    // Retrieve retry count & death list if present
    List<object> DeathList = ((e?.BasicProperties?.Headers != null) && e.BasicProperties.Headers.TryGetValue("x-death", out object obj)) ? obj as List<object> : null;
    int count = ((DeathList != null) &&
        (DeathList.Count > 0) &&
        (DeathList[0] is Dictionary<string, object> values) &&
        values.TryGetValue("count", out obj)
    ) ? Convert.ToInt32(obj) : 0;

    // call actual event method
    switch (OnRequestReceived(e.Body, count, DeathList))
    {
        default:
            channel.BasicAck(e.DeliveryTag, false);
            break;
        case OnReceivedResult.Reject:
            channel.BasicReject(e.DeliveryTag, false);
            break;
        case OnReceivedResult.Requeue:
            channel.BasicReject(e.DeliveryTag, true);
            break;
    }
};
...

// this is the actual "OnReceived" method

static OnReceivedResult OnRequestReceived(byte[] payload, int count, List<object> DeathList)
{
    OnReceivedResult retval = OnReceivedResult.Ack; // success by default

    try
    {
        object request = MessagePackSerializer.Typeless.Deserialize(payload);
        if (request is PollRequestContainer prc)
        {
            Log.Out(
                Level.Info,
                LogFamilies.PollManager,
                log_method,
                null,
                "RequestPoll message received did={0} type=={1} items#={2}", prc.DeviceId, prc.Type, prc.Items == null ? 0 : prc.Items.Length
            );
            if (!RequestManager.RequestPoll(prc.DeviceId, prc.Type, prc.Items)) retval = OnReceivedResult.Reject;
        }
        else if (request is PollUpdateContainer puc)
        {
            Log.Out(Level.Info, LogFamilies.PollManager, log_method, null, "RequestUpdates message received dids#={0} type=={1}", puc.DeviceIds.Length, puc.Type);
            if (!RequestManager.RequestUpdates(puc.DeviceIds, puc.Type)) retval = OnReceivedResult.Reject;
        }
        else Log.Out(Level.Error, LogFamilies.PollManager, log_method, null, "Message payload deserialization error length={0} count={1}", payload.Length, count);
    }
    catch (Exception e)
    {
        Log.Out(Level.Error, LogFamilies.PollManager, log_method, null, e, "Exception dequeueing message. Payload length={0} count={1}", payload.Length, count);
    }

    // message is rejected only if RequestUpdates() or RequestPoll() return false
    // message is always acked if an exception occurs within the try-catch or if a deserialization type check error occurs
    return retval;
}


Solution

  • Regarding queues, some dead connections cause messages in the queue to remain in an unacknowledged (Unacked) state. It is necessary to find these dead connections in collections and manually close them.After manually closing these connections, the unacknowledged messages will re-enter the queue.