phprabbitmqphp-amqplib

Dead lettering with php-amqplib and RabbitMQ?


I'm just starting out in using php-amqplib and RabbitMQ and want a way to handle messages that, for whatever reason, can't be processed and are nack'd. I thought that one way people handle this is with a dead letter queue. I'm trying to set this up but have not had any luck so far and hope someone could offer some suggestions.

My initiation of the queues looks a little something like:

class BaseAbstract
{
    /** @var AMQPStreamConnection */
    protected $connection;
    /** @var AMQPChannel */
    protected $channel;
    /** @var array */
    protected $deadLetter = [
        'exchange' => 'dead_letter',
        'type' => 'direct',
        'queue' => 'delay_queue',
        'ttl' => 10000 // in milliseconds
    ];

    protected function initConnection(array $config)
    {
        try {
            $this->connection = AMQPStreamConnection::create_connection($config);
            $this->channel = $this->connection->channel();

            // Setup dead letter exchange and queue
            $this->channel->exchange_declare($this->deadLetter['exchange'], $this->deadLetter['type'], false, true, false);
            $this->channel->queue_declare($this->deadLetter['queue'], false, true, false, false, false, new AMQPTable([
                'x-dead-letter-exchange' => $this->deadLetter['exchange'],
                'x-dead-letter-routing-key' => $this->deadLetter['queue'],
                'x-message-ttl' => $this->deadLetter['ttl']
            ]));
            $this->channel->queue_bind($this->deadLetter['queue'], $this->deadLetter['exchange']);

            // Set up regular exchange and queue
            $this->channel->exchange_declare($this->getExchangeName(), $this->getExchangeType(), true, true, false);
            $this->channel->queue_declare($this->getQueueName(), true, true, false, false, new AMQPTable([
                'x-dead-letter-exchange' => $this->deadLetter['exchange'],
                'x-dead-letter-routing-key' => $this->deadLetter['queue']
            ]));

            if (method_exists($this, 'getRouteKey')) {
                $this->channel->queue_bind($this->getQueueName(), $this->getExchangeName(), $this->getRouteKey());
            } else {
                $this->channel->queue_bind($this->getQueueName(), $this->getExchangeName());
            }
        } catch (\Exception $e) {
            throw new \RuntimeException('Cannot connect to the RabbitMQ service: ' . $e->getMessage());
        }
        return $this;
    }

    // ...
}

which I thought should set up my dead letter exchange and queue, and then also set up my regular exchange and queue (with the getRouteKey, getQueueName, and getExchangeName/Type methods provided by extending classes)

When I try to handle a message like:

public function process(AMQPMessage $message)
{
    $msg = json_decode($message->body);
    if (empty($msg->payload) || empty($msg->payload->run)) {
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true);
        return;
    }

    // removed for post brevity, but compose $cmd variable

    exec($cmd, $output, $returned);
    if ($returned !== 0) {
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    } else {
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
    }
}

But I get back the error Something went wrong: Cannot connect to the RabbitMQ service: PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'delay_queue' in vhost '/': received 'dead_letter' but current is ''

Is this the way I should be setting up dead lettering? Different examples I've seen around all seem to show a bit of a different way of handling it, none of which seem to work for me. So I've obviously misunderstood something here and am appreciative of any advice. :)


Solution

  • Setting up (permanent) queues and exchanges is something you want to do once, when deploying code, not every time you want to use them. Think of them like your database schema - although the protocol provides "declare" rather than "create", you should generally be writing code that assumes things are configured a particular way. You could build the first part of your code into a setup script, or use the web- and CLI-based management plugin to manage these using a simple JSON format.

    The error you are seeing is probably a result of trying to declare the same queue at different times with different parameters - the "declare" won't replace or reconfigure an existing queue, it will treat the arguments as "pre-conditions" to be checked. You'll need to drop and recreate the queue, or manage it via the management UI, to change its existing parameters.

    Where run-time declares become more useful is when you want to dynamically create items in your broker. You can either give them names you know will be unique to that purpose, or pass null as the name to receive a randomly-generated name back (people sometimes refer to creating an "anonymous queue", but every queue in RabbitMQ has a name, even if you didn't choose it).


    If I'm reading it correctly, your "schema" looks likes this:

    # Dead Letter eXchange and Queue
    Exchange: DLX
    Queue: DLQ; dead letter exchange: DLX, with key "DLQ"; automatic expiry
    Binding: copy messages arriving in DLX to DLQ
    
    # Regular eXchange and Queue
    Exchange: RX
    Queue: RQ; dead letter exchange: DLX, with key "DLQ"
    Binding: copy messages from RX to RQ, optionally filtered by routing key
    

    When a message is "nacked" in RQ, it will be passed to DLX, with its routing key overwritten to be "DLQ". It will then be copied to DLQ. If it is nacked from DLQ, or waits in that queue too long, it will be routed round to itself.

    I would simplify in two ways:

    An alternative might be to set x-dead-letter-routing-key to the name of the regular queue, i.e. to label which queue it came from. But until you have a use case for that, I'd keep it simple and leave the message with its original routing key.