I'm working on a Nuxt project that uses NuxtHub and Cloudflare Queues.
✅ I can successfully publish messages to the queue like this:
const { cloudflare } = event.context;
console.log("Publishing to queue:", log);
await cloudflare.env.COMPARISON_QUEUE.send(log);
📦 My current nuxt.config.ts
looks like this:
export default defineNuxtConfig({
compatibilityDate: "2024-11-01",
devtools: { enabled: true },
modules: [
"@nuxt/eslint",
"@nuxt/ui-pro",
"@nuxt/content",
"@nuxt/icon",
"@nuxthub/core"
],
hub: {
workers: true,
database: true,
bindings: {
queue: {
COMPARISON_QUEUE: {
queue_name: "comparison-queue",
}
},
observability: {
logs: true
}
}
}
});
🧩 I have a queue consumer in server/plugin/queue-consumer.ts
:
import { defineNitroPlugin } from 'nitropack/runtime';
import { processMessages } from '../queue/comparison.consumer';
export default defineNitroPlugin((nitroApp) => {
nitroApp.hooks.hook('cloudflare:queue', async (batchWrapper, env, ctx) => {
console.log('[Queue] Batch wrapper received:', JSON.stringify(batchWrapper, null, 2));
const batch = batchWrapper?.event || batchWrapper?.batch || batchWrapper;
const messages = batch?.messages || [];
console.log('[Queue] Extracted messages:', messages.length);
if (ctx && typeof ctx.waitUntil === 'function') {
try {
ctx.waitUntil(processMessages(batch, env, batchWrapper));
} catch (error) {
console.error('[Queue] Error in processMessages:', error);
}
} else {
try {
await processMessages(batch, env, batchWrapper);
} catch (error) {
console.error('[Queue] Error in processMessages (fallback):', error);
}
}
for (const message of messages) {
try {
console.log('[Queue] Processing message ID:', message.id);
if (message && typeof message.ack === 'function') {
message.ack();
}
} catch (err) {
console.error('[Queue] Error ACK:', err);
if (message && typeof message.retry === 'function') {
message.retry({ delaySeconds: 30 });
}
}
}
});
});
🔍 The plugin works perfectly when used in a regular Nuxt app with Cloudflare Workers without NuxtHub.
🤔 Now that I'm using NuxtHub (@nuxthub/core
version 0.8.25
), I'm not seeing any logs or evidence that the queue consumer is invoked at all. I suspect I might be missing a configuration step to register the consumer, but I can't find any documentation or examples.
Question:
Do I need to declare the queue consumer explicitly somewhere in NuxtHub’s configuration? Or is there something else I need to do to make the queue consumer work?
Ran into the same issue and what I had to do was basically add my worker project as a consumer in the cloudflare dashboard:
Cloudflare dashboard -> storage & databases -> queues -> select your queue -> settings and add your project (worker) as a consumer.
Now I see the message being consumed and acknowledged.
Hope it helps!