phprabbitmqconsumerphp-amqplib

Rabbitmq PHP consume every second


What is the best practice, to receive Data from a queue every second via php? I do this with an ajax query, what calls the php script every second. There, a connection object is created and a queue is declared every time. I tried to save this after the first time in a session variable, but when I call the PHP script a second time, I can't receive any more data. When I debug the channel object, I see that is_open is false:

  protected' is_open' => boolean false 

Here is my basic php test code:

<?php


require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;



session_start(); # start session handling.

$id        = $_GET["uid"];
$connected = $_GET["connected"];

if (empty($id)) {
    $id = 0;
}
$queue = 'CyOS EV Queue ' . $id;

$reset = $_GET["reset"];

if ($reset === "true") {
    session_destroy();
    $_SESSION = array();
    echo "session destroyed";
    var_dump($_SESSION);
    exit;

}


$connection;
$channel;


if (!isset($_SESSION['coneccted'])) {

    $_SESSION['coneccted'] = true;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');


    $channel = $connection->channel();


    $channel->queue_declare($queue, false, false, false, false, false);

    $channel->queue_bind($queue, 'CyOS-EX');

    $_SESSION['connection'] = $connection;
    $_SESSION['channel']    = $channel;




} else {
    echo "already connected \n\r";
    $connection = $_SESSION['connection'];

    $channel = $_SESSION['channel'];

    var_dump($_SESSION);

}

$test = new AMQPMessage();

while ($i < 10) {

echo "try to get data from " . $queue . "\n\r";
$test = $channel->basic_get($queue, true);

$i++;
if (isset($test)) {
    echo "received data";
    break;

   }
 }
echo $test->body;

When I initilize the connection and the channel every time I call the script then it works.


Solution

  • I presume the lines you are concerned about are these ones:

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->queue_declare($queue, false, false, false, false, false);
    $channel->queue_bind($queue, 'CyOS-EX');
    

    Let's look at what's happening here:

    1. Connect to the RabbitMQ server. This is like connecting to a database, or memcache, or any other external process, and needs to happen in each PHP request. You can't store the connection in the session, because it's not data, it's an active resource which will be closed when PHP exits.
    2. Request the default Channel on the connection. This is really just part of the connection code, and shouldn't consume any significant time or resources.
    3. Declare the queue. This will check if the queue already exists, and if it does, will do nothing. On the other hand, if you know the queue exists (because it's a permanent queue created in an admin interface, or you're sure another process will have created it) you can skip this line.
    4. Bind the queue to the exchange. This is part of the setup of the queue; if the queue didn't exist and wasn't already bound, there would be nothing in it to consume until after this line runs. As with the previous step, can probably be skipped if you know it's happened elsewhere.

    The normal way to avoid re-connecting (steps 1 and 2) is to have the consumer running in the background, e.g. starting a command-line PHP script using supervisord which runs continuously processing messages as they come in. However, that won't work if you need to get data back to the browser once it appears in the queue.

    Common alternatives to polling and creating a new PHP process each time include:

    As I say, these are not specific to RabbitMQ, but apply to any time you're waiting for something to happen in a database, a file, a remote service, etc.