I am trying to implement a solution based on queues for our Discord-related tasks which uses Discord API. Need some input regarding how this can be implemented.
Here are the expectations from it
Here's what I've tried I've tried using amqplib and RabbitMQ with a direct type exchange, but I am unable to pause one queue based on another.
const amqp = require('amqplib');
async function setup() {
let connection;
let channel;
try {
// Connect to the RabbitMQ server
connection = await amqp.connect('amqp://localhost');
// Create a channel
channel = await connection.createChannel();
// Assert an exchange of type 'direct' that is durable
await channel.assertExchange('priority_exchange', 'direct', {
durable: true,
});
// Assert queues that are durable
await channel.assertQueue('high_priority_queue', {
durable: true,
arguments: {
'x-max-priority': 10, // Ensure this matches the existing queue configuration
},
});
await channel.assertQueue('low_priority_queue', {
durable: true,
});
// Bind the queues to the exchange with specific routing keys
await channel.bindQueue('high_priority_queue', 'priority_exchange', 'high');
await channel.bindQueue('low_priority_queue', 'priority_exchange', 'low');
console.log('Setup complete.');
} catch (error) {
console.error('Setup failed:', error);
} finally {
// Ensure the channel and connection are closed properly
if (channel) {
await channel.close();
}
if (connection) {
await connection.close();
}
}
}
async function consumer() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue('high_priority_queue', {
durable: true,
arguments: {
'x-max-priority': 10,
},
});
await channel.assertQueue('low_priority_queue', {
durable: true,
});
channel.prefetch(1);
channel.consume(
'high_priority_queue',
async(msg) => {
if (msg) {
console.log(
`Received ${msg.content.toString()} from high_priority_queue`
);
await fakePromise();
channel.ack(msg);
}
}, {
noAck: false
}
);
channel.consume(
'low_priority_queue',
async(msg) => {
if (msg) {
console.log(
`Received ${msg.content.toString()} from low_priority_queue`
);
await fakePromise();
channel.ack(msg);
}
}, {
noAck: false
}
);
}
consumer().catch(console.error);
function fakePromise() {
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 5000);
});
}
If you're fine with classic queues, you can make use of a priority queue.
To have it work, apart from using the priority field of basic.properties, you need to make sure that you have a single consumer and that he has qos (maximum size of the "batch" of messages he can be processing at a time) to 1.
That way any of the messages in the queue that are not currently assigned to a consumer will be prioritized based on the priority provided, higher value first.
Worth noting that with this approach there is no logic available (to my knowledge) to push a priority message over a lower priority message currently being processed.