I am trying to code a simple consumer using librabbitmq. It is working, but when I do execute amqp_basic_consume, it consumes the entire queue. What I want is for it to get a single message, process it and repeat.
I tried using a basic_qos to have the consumer prefetch 1 at a time, but that seems to have no effect at all.
The basic setup and loop: // set qos of 1 message at a time if (!amqp_basic_qos(conn, channel, 0, 1, 0)) { die_on_amqp_error(amqp_get_rpc_reply(conn), "basic.qos"); }
// Consuming the message
amqp_basic_consume(conn, channel, queue, amqp_empty_bytes, no_local, no_ack, exclusive, amqp_empty_table);
while (run) {
amqp_rpc_reply_t result;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
result = amqp_consume_message(conn, &envelope, &timeout, 0);
if (AMQP_RESPONSE_NORMAL == result.reply_type) {
strncpy(message, envelope.message.body.bytes, envelope.message.body.len);
message[envelope.message.body.len] = '\0';
printf("Received message size: %d\nbody: -%s-\n", (int) envelope.message.body.len, message );
if ( strncmp(message, "DONE",4 ) == 0 )
{
printf("XXXXXXXXXXXXXXXXXX Cease message received. XXXXXXXXXXXXXXXXXXXXX\n");
run = 0;
}
amqp_destroy_envelope(&envelope);
}else{
printf("Timeout.\n");
run = 0;
}
}
I expect to have a queue filled that I can start processing and if I hit ^C, the remaining messages are still in the queue. Instead, even if I have only processed one message, the entire queue is emptied.
This is the behavior when noAck
is true. What will happen is that the messages will be pushed to the connected consumer as fast as the broker can send them, because it assumes that the consumer is able to accept them as they are acknowledged immediately upon delivery.
You would want to change noAck
to false, then explicitly ack
each message back to the broker in this case.
Alternatively, you could use a basic.get
to pull messages from the broker one at a time as opposed to using a push-based consumer (there are folks out there who don't like this idea). Your use case will determine what is most appropriate, but based on the fact that you seem to have a full queue and fairly process-intensive messages, I would assume a basic.get
would be just fine in this scenario. The question then would be to decide how often to poll when the queue is empty.