I have coded up a C (rabbitmq-c) worker app which consumes a queue published by a Python script (pika).
I have the following strange behaviour which I can't seem to solve:
Any ideas what could be going on?
I've tried making sure that each consumer has it's own channel (is this necessary?) but still the same behaviour...
Here's the code for the consumer (worker):
conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
"/",
0,
131072,
0,
AMQP_SASL_METHOD_PLAIN,
"guest",
"guest");
if (amqp_channel_open(conn, chan) == NULL)
LOG_ERR(" [!] Failed to open amqp channel!\n");
if ((q = amqp_queue_declare(conn,
chan,
amqp_cstring_bytes("ranges"),
0,
0,
0,
0,
amqp_empty_table)) == NULL)
LOG_ERR(" [!] Failed to declare queue!\n");
LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);
amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
while(1) {
amqp_maybe_release_buffers(conn);
amqp_consume_message(conn, &e, NULL, 0);
{
int n;
amqp_frame_t f;
unsigned char buf[8];
unsigned char *pbuf = buf;
amqp_simple_wait_frame(conn, &f); // METHOD frame
amqp_simple_wait_frame(conn, &f); // HEADER frame
n = f.payload.properties.body_size;
if (n != sizeof(range_buf))
LOG_ERR(" [!] Invalid message size!");
while (n) {
amqp_simple_wait_frame(conn, &f); // BODY frame
memcpy(pbuf,
f.payload.body_fragment.bytes,
f.payload.body_fragment.len);
n -= f.payload.body_fragment.len;
pbuf += f.payload.body_fragment.len;
}
// do something with buf
LOG_INFO(" [x] Message recevied from queue\n");
}
amqp_destroy_envelope(&e);
amqp_maybe_release_buffers(conn);
}
The problem here is most likely that your consumer pre-fetches all messages when started. This is default behavior by RabbitMQ, but you can reduce the number of messages pre-fetched by the consumer, to allow you to better spread the workload across multiple workers.
This simply means that one or more of the consumers will pick up all the messages, and leave none for the new consumers.
If you apply qos to your consumer and limit the pre-fetch to lets say 10 messages. The consumer will only queue up the 10 first messages, and the new consumers can pick up the slack.
The function you are looking for to implement this is called amqp_basic_qos, and in addition you can read more about consumer-prefetch here.