nuxt.jsqueuecloudflare

How to consume a Cloudflare Queue with NuxtHub (version 0.8.25)?


I'm working on a Nuxt project that uses NuxtHub and Cloudflare Queues.

✅ I can successfully publish messages to the queue like this:

const { cloudflare } = event.context;

console.log("Publishing to queue:", log);

await cloudflare.env.COMPARISON_QUEUE.send(log);

📦 My current nuxt.config.ts looks like this:

export default defineNuxtConfig({
  compatibilityDate: "2024-11-01",
  devtools: { enabled: true },
  modules: [
    "@nuxt/eslint",
    "@nuxt/ui-pro",
    "@nuxt/content",
    "@nuxt/icon",
    "@nuxthub/core"
  ],
  hub: {
    workers: true,
    database: true,
    bindings: {
      queue: {
        COMPARISON_QUEUE: {
          queue_name: "comparison-queue",
        }
      },
      observability: {
        logs: true
      }
    }
  }
});

🧩 I have a queue consumer in server/plugin/queue-consumer.ts:

import { defineNitroPlugin } from 'nitropack/runtime';
import { processMessages } from '../queue/comparison.consumer';

export default defineNitroPlugin((nitroApp) => {
  nitroApp.hooks.hook('cloudflare:queue', async (batchWrapper, env, ctx) => {
    console.log('[Queue] Batch wrapper received:', JSON.stringify(batchWrapper, null, 2));

    const batch = batchWrapper?.event || batchWrapper?.batch || batchWrapper;
    const messages = batch?.messages || [];
    console.log('[Queue] Extracted messages:', messages.length);

    if (ctx && typeof ctx.waitUntil === 'function') {
      try {
        ctx.waitUntil(processMessages(batch, env, batchWrapper));
      } catch (error) {
        console.error('[Queue] Error in processMessages:', error);
      }
    } else {
      try {
        await processMessages(batch, env, batchWrapper);
      } catch (error) {
        console.error('[Queue] Error in processMessages (fallback):', error);
      }
    }

    for (const message of messages) {
      try {
        console.log('[Queue] Processing message ID:', message.id);
        if (message && typeof message.ack === 'function') {
          message.ack();
        }
      } catch (err) {
        console.error('[Queue] Error ACK:', err);
        if (message && typeof message.retry === 'function') {
          message.retry({ delaySeconds: 30 });
        }
      }
    }
  });
});

🔍 The plugin works perfectly when used in a regular Nuxt app with Cloudflare Workers without NuxtHub.

🤔 Now that I'm using NuxtHub (@nuxthub/core version 0.8.25), I'm not seeing any logs or evidence that the queue consumer is invoked at all. I suspect I might be missing a configuration step to register the consumer, but I can't find any documentation or examples.

Question:
Do I need to declare the queue consumer explicitly somewhere in NuxtHub’s configuration? Or is there something else I need to do to make the queue consumer work?


Solution

  • Ran into the same issue and what I had to do was basically add my worker project as a consumer in the cloudflare dashboard:

    Cloudflare dashboard -> storage & databases -> queues -> select your queue -> settings and add your project (worker) as a consumer.

    Now I see the message being consumed and acknowledged.

    Hope it helps!