crabbitmqamqplibrabbitmq

AMQP RabbitMQ consumers blocking eachother?


I have coded up a C (rabbitmq-c) worker app which consumes a queue published by a Python script (pika).

I have the following strange behaviour which I can't seem to solve:

  1. Starting all the workers before messages are published to the queue works as expected
  2. Starting 1 worker after the queue has been published works as expected
  3. HOWEVER: Starting additional workers after a worker has started consuming from the queue means that those workers don't see any messages on the queue (message count=0) and therefore just wait (eventhough there are meant to be many messages still on the queue). Killing the first worker will suddently start messages flowing to all the other (waiting) consumers.

Any ideas what could be going on?

I've tried making sure that each consumer has it's own channel (is this necessary?) but still the same behaviour...

Here's the code for the consumer (worker):

conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
           "/",
           0,
           131072,
           0,
           AMQP_SASL_METHOD_PLAIN,
           "guest",
           "guest");

if (amqp_channel_open(conn, chan) == NULL)
    LOG_ERR(" [!] Failed to open amqp channel!\n");

if ((q = amqp_queue_declare(conn,
                            chan,
                            amqp_cstring_bytes("ranges"),
                            0,
                            0,
                            0,
                            0,
                            amqp_empty_table)) == NULL)
    LOG_ERR(" [!] Failed to declare queue!\n");

LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);

amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);

while(1) {
    amqp_maybe_release_buffers(conn);
    amqp_consume_message(conn, &e, NULL, 0);

    {
        int n;
        amqp_frame_t f;
        unsigned char buf[8];
        unsigned char *pbuf = buf;

        amqp_simple_wait_frame(conn, &f);       // METHOD frame
        amqp_simple_wait_frame(conn, &f);       // HEADER frame

        n = f.payload.properties.body_size;
        if (n != sizeof(range_buf))
            LOG_ERR(" [!] Invalid message size!");

        while (n) {
            amqp_simple_wait_frame(conn, &f);   // BODY frame
            memcpy(pbuf,
                   f.payload.body_fragment.bytes,
                   f.payload.body_fragment.len);
            n -= f.payload.body_fragment.len;
            pbuf += f.payload.body_fragment.len;
        }

        // do something with buf

        LOG_INFO(" [x] Message recevied from queue\n");
    }

    amqp_destroy_envelope(&e);

    amqp_maybe_release_buffers(conn);
}

Solution

  • The problem here is most likely that your consumer pre-fetches all messages when started. This is default behavior by RabbitMQ, but you can reduce the number of messages pre-fetched by the consumer, to allow you to better spread the workload across multiple workers.

    This simply means that one or more of the consumers will pick up all the messages, and leave none for the new consumers.

    If you apply qos to your consumer and limit the pre-fetch to lets say 10 messages. The consumer will only queue up the 10 first messages, and the new consumers can pick up the slack.

    The function you are looking for to implement this is called amqp_basic_qos, and in addition you can read more about consumer-prefetch here.