javascriptnode.jsfirebasegoogle-cloud-firestoregoogle-cloud-functions

Using asynchronous tasks and batch updates at the same time


I have a react native app connected to Firestore. I also use Cloud Functions to send notifications to each users. The following code works but I recently encountered timeout errors (even with 540 seconds timeout and 512MB memory) :

async function sendNotifications() {
    console.log("Sending notifications for recommended events...");

    // Fetch all events once
    const eventsRef = admin.firestore().collection("Events");
    const eventsSnapshot = await eventsRef
        .where('Start', '>=', new Date())
        .get();

    if (eventsSnapshot.empty) {
        console.log("No upcoming events found.");
        return;
    }
    const allEvents = eventsSnapshot.docs.map(doc => ({ ...doc.data(), docId: doc.id }));

    // Fetch all users
    const usersRef = admin.firestore().collection("Users");
    const usersSnapshot = await usersRef.get();

    let reset = false;

    for (const userDoc of usersSnapshot.docs) {
        try {
            const userData = userDoc.data();
            const { fcmToken, preferences, language = "en", sentNotifications = [] } = userData;

            if (!fcmToken) continue; // Skip users without FCM token
            const userPreferredTags = preferences ? preferences : [];
            let eventToSend = findEventForUser(allEvents, userPreferredTags, sentNotifications);

            // Fallback logic: No matching events, or user has no preferences
            if (!eventToSend) {
                eventToSend = findBangerEvent(allEvents, sentNotifications);
            }
            if (!eventToSend && sentNotifications.length > 0) {
                console.log(`No new events to suggest, resetting`);
                eventToSend = sentNotifications[sentNotifications.length - 1];
                reset = true;
            }

            if (!eventToSend) {
                console.log(`No events to send for user ${userDoc.id}. Skipping.`);
                continue;
            }

            const notificationPayload = createNotificationPayload(
                eventToSend,
                fcmToken,
                language
            );
            await admin.messaging().send(notificationPayload);
            console.log(`Successfully sent message to user ${userDoc.id}, ${notificationPayload.notification.title}`);

            const updatedNotifications = updateSentNotifications(eventToSend, reset ? [] : sentNotifications);
            await userDoc.ref.update({ sentNotifications: updatedNotifications });

        } catch (error) {
            console.error(`Error processing user ${userDoc.id}:`, error);
        }
    }

    console.log("Notifications sent successfully.");
}

I thus moved to asynchronous functions to be able to process users simultaneously but, if I understood correctly, it is also good practice to batch the userDoc updates to Firestore and the FCM messages with sendEach().

So I tried that on the Firebase Emulator using the following code :

async function sendNotifications() {
    console.log("Sending notifications for recommended events...");

    // Fetch all events once
    const eventsRef = admin.firestore().collection("Events");
    const eventsSnapshot = await eventsRef
        .where('Start', '>=', new Date())
        .get();

    if (eventsSnapshot.empty) {
        console.log("No upcoming events found.");
        return;
    }
    const allEvents = eventsSnapshot.docs.map(doc => ({ ...doc.data(), docId: doc.id }));

    // Fetch all users
    const usersRef = admin.firestore().collection("Users");
    const usersSnapshot = await usersRef.get();

    const usersToProcess = usersSnapshot.docs.filter(userDoc => {
        const userData = userDoc.data();
        return true; // Include all users with an FCM token (set to true in emulator)
    });

    console.log(`Processing ${usersToProcess.length} users...`);

    const notifications = [];
    let batch = admin.firestore().batch();
    let batchUserCount = 0; // Track the number of users in the current batch

    const userPromises = usersToProcess.map(async (userDoc) => {
        const userData = userDoc.data();
        const { fcmToken, preferences, language = "en", sentNotifications = [] } = userData;

        const userPreferredTags = preferences || [];
        let eventToSend = findEventForUser(allEvents, userPreferredTags, sentNotifications);

        // Fallback logic: No matching events
        if (!eventToSend) {
            eventToSend = findBangerEvent(allEvents, sentNotifications) ||
                sentNotifications[sentNotifications.length - 1];
        }

        if (!eventToSend) {
            console.log(`No events to send for user ${userDoc.id}. Skipping.`);
            return;
        }

        const notificationPayload = createNotificationPayload(eventToSend, fcmToken ? fcmToken : "ezeazea", language);
        notifications.push(notificationPayload);

        const updatedNotifications = updateSentNotifications(eventToSend, sentNotifications);
        const dataSize = JSON.stringify({ sentNotifications: updatedNotifications }).length;
        console.log(`Estimated size of update: ${dataSize} bytes`);

        batch.update(userDoc.ref, { sentNotifications: updatedNotifications });
        batchUserCount++;

        // If the batch has 100 operations, commit the batch and start a new one
        if (batchUserCount === 100) {
            console.log("Committing Firestore batch...");
            await batch.commit(); // Commit the batch
            batch = admin.firestore().batch(); // Create a new batch
            batchUserCount = 0; // Reset the batch user count
        }
    });

    await Promise.all(userPromises);

    // Commit remaining updates if any users were left in the batch
    if (batchUserCount > 0) {
        console.log("Committing remaining Firestore batch...");
        await batch.commit();
    }

    // Send notifications in bulk (in batches of 100)
    console.log("Sending notifications in bulk...");
    while (notifications.length) {
        const batchNotifications = notifications.splice(0, 100); // Firebase max batch size for FCM
        try {
            await admin.messaging().sendEach(batchNotifications);
        } catch (error) {
            console.error("Error sending notifications:", error);
            // Handle the error as necessary
        }
    }

    console.log("Notifications sent successfully.");
}

But it seems that the asynchronous processing of the users clashes with the batching as I get the following error on the second commit call:

āš  functions: Error: Cannot modify a WriteBatch that has been committed.


Solution

  • The problem with the first block of code is that you use await inside the for loop, which means that the loads from Firestore and the FCM send calls are all serialized. So the next load/send doesn't start until the previous load/send is finished.

    There are scenarios where this flow is necessary, for example when the next load/send operation depends on the results of the previous operation. But that doesn't seem to be the case in your scenario. Therefore it's better to process the load/send calls as quickly as possible and then wait for all of them combined to finish.

    In code that should be something like this:

    async function sendNotifications() {
        console.log("Sending notifications for recommended events...");
    
        // Fetch all events once
        const eventsRef = admin.firestore().collection("Events");
        const eventsSnapshot = await eventsRef
            .where('Start', '>=', new Date())
            .get();
    
        if (eventsSnapshot.empty) {
            console.log("No upcoming events found.");
            return;
        }
        const allEvents = eventsSnapshot.docs.map(doc => ({ ...doc.data(), docId: doc.id }));
    
        // Fetch all users
        const usersRef = admin.firestore().collection("Users");
        const usersSnapshot = await usersRef.get();
    
        let reset = false;
        let promises = []; // šŸ‘ˆ This is where we'll track all pending load/send calls
    
        for (const userDoc of usersSnapshot.docs) {
            const userData = userDoc.data();
            const { fcmToken, preferences, language = "en", sentNotifications = [] } = userData;
    
            if (!fcmToken) continue; // Skip users without FCM token
            const userPreferredTags = preferences ? preferences : [];
            let eventToSend = findEventForUser(allEvents, userPreferredTags, sentNotifications);
    
            // Fallback logic: No matching events, or user has no preferences
            if (!eventToSend) {
                eventToSend = findBangerEvent(allEvents, sentNotifications);
            }
            if (!eventToSend && sentNotifications.length > 0) {
                console.log(`No new events to suggest, resetting`);
                eventToSend = sentNotifications[sentNotifications.length - 1];
                reset = true;
            }
    
            if (!eventToSend) {
                console.log(`No events to send for user ${userDoc.id}. Skipping.`);
                continue;
            }
    
            const notificationPayload = createNotificationPayload(
                eventToSend,
                fcmToken,
                language
            );
            let promise = admin.messaging().send(notificationPayload).then(() => { // šŸ‘ˆ Remove await here
                console.log(`Successfully sent message to user ${userDoc.id}, ${notificationPayload.notification.title}`);
    
                const updatedNotifications = updateSentNotifications(eventToSend, reset ? [] : sentNotifications);
                return userDoc.ref.update({ sentNotifications: updatedNotifications }); šŸ‘ˆ Remove await here
            }).catch((error) => {
              console.error(`Error processing user ${userDoc.id}:`, error);
            })
    
            promises.push(promise); // šŸ‘ˆ Add async operation to the list
    
        }
    
        await Promise.all(promises); // šŸ‘ˆ Wait for all async operations in one go
    
        console.log("Notifications sent successfully.");
    }
    

    I marked the changes I made in the code above, but essentially this code creates one list of the promises for all async operations and then awaits that list until all are completed. This allows the operations to be executed in parallel rather than sequentially, which typically leads to immense throughput gains.

    Note that there may still be limits to how many pending async operations each system can handle/allow. If you encounter those, it's best to introduce a batching system in your code (so not using the Firebase batch API) where you have a certain maximum number of pending async operations. But I'd typically not introduce such a batching system until I need it, and likely the above solution with parallel async operations will buy you quite some leeway already.