rabbitmqphp-amqplib

Ensure that AMQP exchange binding exists before publishing


The System Layout

We have three systems:

  1. An API Endpoint (Publisher and Consumer)
  2. The RabbitMQ Server
  3. The main application/processor (Publisher and consumer)

System 1 and 3 both use Laravel, and use PHPAMQPLIB for interaction with RabbitMQ.

The path of a message

System 1 (the API Endpoint) sends a serialized job to the RabbitMQ Server for System 3 to process. It then immediately declares a new randomly named queue, binds an exchange to that queue with a correlation ID - and starts to listen for messages.

Meanwhile, system 3 finishes the job, and once it does, responds back with details from that job to RabbitMQ, on the exchange, with the correlation ID.

The issue and what I've tried

I often find that this process fails. The job gets sent and received, and the response gets sent - but system 1 never reads this response, and I don't see it published in RabbitMQ.

I've done some extensive debugging of this without getting to a root cause. My current theory is that System 3 is so quick at returning a response, that the new queue and exchange binding hasn't even been declared yet from System 1. This means the response from System 3 has nowhere to go, and as a result vanishes. This theory is mainly based on the fact that if I set jobs to be processed at a lower frequency on System 3, the system becomes more reliable. The faster the jobs process, the more unreliable it becomes.

The question is: How can I prevent that? Or is there something else that I'm missing? I of course want these jobs to process quickly and efficiently without breaking the Request/Response-pattern.

I've logged output from both systems - both are working with the same correlation ID's, and System 3 gets an ACK upon publishing - whilst System 1 has a declared queue with no messages that eventually just times out.

Code Example 1: Publishing a Message

/**
 * Helper method to publish a message to RabbitMQ
 *
 * @param $exchange
 * @param $message
 * @param $correlation_id
 * @return bool
 */
public static function publishAMQPRouteMessage($exchange, $message, $correlation_id)
{
    try {
        $connection = new AMQPStreamConnection(
            env('RABBITMQ_HOST'),
            env('RABBITMQ_PORT'),
            env('RABBITMQ_LOGIN'),
            env('RABBITMQ_PASSWORD'),
            env('RABBITMQ_VHOST')
        );
        $channel = $connection->channel();

        $channel->set_ack_handler(function (AMQPMessage $message) {
            Log::info('[AMQPLib::publishAMQPRouteMessage()] - Message ACK');
        });

        $channel->set_nack_handler(function (AMQPMessage $message) {
            Log::error('[AMQPLib::publishAMQPRouteMessage()] - Message NACK');
        });

        $channel->confirm_select();

        $channel->exchange_declare(
            $exchange,
            'direct',
            false,
            false,
            false
        );

        $msg = new AMQPMessage($message);
        $channel->basic_publish($msg, $exchange, $correlation_id);

        $channel->wait_for_pending_acks();

        $channel->close();
        $connection->close();

        return true;
    } catch (Exception $e) {
        return false;
    }
}

Code Example 2: Waiting for a Message Response

/**
 * Helper method to fetch messages from RabbitMQ.
 *
 * @param $exchange
 * @param $correlation_id
 * @return mixed
 */
public static function readAMQPRouteMessage($exchange, $correlation_id)
{
    $connection = new AMQPStreamConnection(
        env('RABBITMQ_HOST'),
        env('RABBITMQ_PORT'),
        env('RABBITMQ_LOGIN'),
        env('RABBITMQ_PASSWORD'),
        env('RABBITMQ_VHOST')
    );
    $channel = $connection->channel();

    $channel->exchange_declare(
        $exchange,
        'direct',
        false,
        false,
        false
    );

    list($queue_name, ,) = $channel->queue_declare(
        '',
        false,
        false,
        true,
        false
    );

    $channel->queue_bind($queue_name, $exchange, $correlation_id);

    $callback = function ($msg) {
        return self::$rfcResponse = $msg->body;
    };

    $channel->basic_consume(
        $queue_name,
        '',
        false,
        true,
        false,
        false,
        $callback
    );

    if (!count($channel->callbacks)) {
        Log::error('[AMQPLib::readAMQPRouteMessage()] - No callbacks registered!');
    }

    while (self::$rfcResponse === null && count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();

    return self::$rfcResponse;
}

Grateful for any advise you can offer!


Solution

  • I may be missing something, but when I read this:

    System 1 (the API Endpoint) sends a serialized job to the RabbitMQ Server for System 3 to process. It then immediately declares a new randomly named queue, binds an exchange to that queue with a correlation ID - and starts to listen for messages.

    My first thought was "why do you wait until the message is sent before declaring the return queue?"

    In fact, we have a whole series of separate steps here:

    1. Generating a correlation ID
    2. Publishing a message containing that ID to an exchange for processing elsewhere
    3. Declaring a new queue to receive responses
    4. Binding the queue to an exchange using the correlation ID
    5. Binding a callback to the new queue
    6. Waiting for responses

    The response cannot come until after step 2, so we want to do that as late as possible. The only step that can't come before that is step 6, but it's probably convenient to keep steps 5 and 6 close together in the code. So I would rearrange the code to:

    1. Generating a correlation ID
    2. Declaring a new queue to receive responses
    3. Binding the queue to an exchange using the correlation ID
    4. Publishing a message containing the correlation ID to an exchange for processing elsewhere
    5. Binding a callback to the new queue
    6. Waiting for responses

    This way, however quickly the response is published, it will be picked up by the queue declared in step 2, and as soon as you bind a callback and start waiting, you will process it.

    Note that there is nothing that readAMQPRouteMessage knows that publishAMQPRouteMessage doesn't, so you can freely move code between them. All you need when you want to consume from the response queue is its name, which you can either save into a variable and pass around, or generate yourself rather than letting RabbitMQ name it. For instant, you could name it after the correlation ID it is listening for, so that you can always work out what it is with simple string manipulation, e.g. "job_response.{$correlation_id}"