phprabbitmqphp-amqplib

How to close rabbitmq connections from consumer callback using php-amqplib


I have been able to set up a script that connects to RabbitMQ using the php-amqplib library. However, I want to be able to exit the script, if necessary.

We thought that creating a stop.txt file would solve the problem. Basically, once a message is received, if the script sees that file, it must stop any further messages from being received, delete the file and exit the script.

However, although it is deleting the file, it does not prevent further messages from being received, neither does it exit the script.

As you can see I am using exactly the same function to exit the script as this official example uses: https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_consumer.php. See line 54.

Am I doing something wrong or not understanding something? I am a novice to this library.

Here is the script I am working with. I have left parts out for brevity.

function process_message($message)
{
    // ... do some processing ... //
    // ... insert data into DB ... //

    // send an acknowledgement back to the producer: message successfully received
    $message->ack();
    
    // if a stop file exists then exit the script
    if (file_exists("./stop.txt")) {
        $message->getChannel()->basic_cancel($message->getConsumerTag());
        unlink("./stop.txt"); // delete stop file
    }
}

function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}

$connection = new AMQPStreamConnection(HOST, PORT, USERNAME, PASSWORD, VHOST);
$channel = $connection->channel();

$channel->queue_declare(QUEUE, false, true, false, false);

// make sure only one message is sent at a time
$channel->basic_qos(null, 1, null);

// consume messages from the queue
$channel->basic_consume(QUEUE, 'consumer', false, false, false, false, 'process_message');

register_shutdown_function('shutdown', $channel, $connection);

// Loop as long as the channel has callbacks registered
while ($channel->is_consuming()) {
    $channel->wait();
}

Thanks in advance,

Richard


Solution

  • This seems to close everything off. I do not know whether it is the best solution. I basically close the channel and connection from within the callback.

    // if a stop file exists then exit the script
    if (file_exists("./stop.txt")) {
        $message->getChannel()->basic_cancel($message->getConsumerTag());
        // NEW CODE //
        $channel = $message->getChannel();
        $connection = $message->getChannel()->getConnection();
        $channel->close();
        $connection->close();
        // END NEW CODE //
        unlink("./stop.txt"); // delete stop file
    }