Currently I am working on a project that publishes any received request contents to a pub sub topic and subsequent subscriber, in order to pull and batch process all of the messages at once every single day.
To get all of these messages into my application, I am using the google client library for node.js (https://github.com/googleapis/nodejs-pubsub) and a loop with repeated runs of v1.SubscriberClient.pull({subscription: subscriptionName, maxMessages: 1000}) to try and pull and flush every single message in the subscriber, repeating until 0 messages are returned three pulls in a row, or until a limit of pulls is reached.
While this seems to work with small amounts of messages, when testing with large amounts of unacked messages it seems to cap out at 5000 messages returned:


I was hoping someone here could help identify/resolve this issue, ideally it would be able to pull ALL messages in the subscriber for the batch processing method I mentioned earlier. CoPilot/GPT seem to think this is a limit placed within the client library preventing the delivery of more than 5000 messages, but I could not find any resources to prove this, so the issue may also be on the infrastructure itself?
Code:
private async pullAllMessages(
subscriptionName: string
): Promise<MeterData[]> {
const fetchedData: MeterData[] = [],
maxIterations = Number(process.env.MAX_ITERATIONS) || 100,
maxEmptyPulls = 3;
let iteration = 0,
consecutiveEmptyPulls = 0;
await this.loggingService.log({
function: "SubscriberService.pullAllMessages",
action: "pulling messages",
message: `Beginning iterations to pull all messages.`,
status: 200,
});
while (true) {
iteration++;
if (iteration > maxIterations) {
await this.loggingService.log({
function: "SubscriberService.pullAllMessages",
action: "finish pulling messages",
message: `Messages finished pulling after ${iteration} Iterations And ${fetchedData.length} Messages Are Ready To Be Processed`,
status: 201,
});
break;
}
let response: google.pubsub.v1.IPullResponse;
try {
response = await this.gcpService.pullMessages(subscriptionName);
} catch (error) {
await this.loggingService.error({
function: "SubscriberService.pullAllMessages",
action: "pull messages",
message: `Error Pulling Messages From The Subscriber During Iteration ${iteration}. Retrying in Next Iteration`,
error: error,
status: 500,
});
continue;
}
const numberOfMessagesRecieved = response.receivedMessages?.length ?? 0;
console.log(
`Iteration ${iteration}: Pulled ${numberOfMessagesRecieved} messages.`
);
fetchedData.push(
...this.subscriberHelperService.formatMessages(
response.receivedMessages ?? []
)
);
if (numberOfMessagesRecieved === 0) {
consecutiveEmptyPulls++;
if (consecutiveEmptyPulls >= maxEmptyPulls) {
await this.loggingService.log({
function: "SubscriberService.pullAllMessages",
action: "finish pulling messages",
message: `Received 0 messages. Messages finished pulling after ${iteration} Iterations And ${fetchedData.length} Messages Are Ready To Be Processed`,
status: 201,
});
break;
}
} else {
consecutiveEmptyPulls = 0;
}
}
return fetchedData;
}
async pullMessages(
subscriptionName: string
): Promise<google.pubsub.v1.IPullResponse> {
const maxMessages = Number(process.env.MAX_PUBSUB_MESSAGES);
const request = {
subscription: subscriptionName,
maxMessages: maxMessages,
};
const [response] = await this.subClient.pull(request);
return response;
}
Edit: (23/09/25)
I believe there may be a limit set either in the client library or on the subscriber/project itself, but can't find any way to modify or view it. A reference is made to a limit in this doc - https://cloud.google.com/pubsub/quotas
I guess I could acknowledge messages as I receive them, but I don't really like this as if the job fails this data will be gone forever
As you noted is mentioned in "Pub/Sub quotas and limits", there are limits for the amount of outstanding messages. Specifically, if you have a subscription with exactly-once delivery enabled there will be lower limits when using the unary Pull API, as is done here.
Using the StreamingPull API will help to increase this limit and allow for higher throughput. For StreamingPull, tt is recommended to use the high-level Pub/Sub client library. Note the high-level client libraries offer flow control settings to configure the number of outstanding messages permitted, as well as additional support for exactly-once delivery. This previous StackOverflow answer provides some options for doing batch processing, specifically:
In your user callback, re-batch messages and once the batch reaches a desired size (or you've waited a sufficient amount of time to fill the batch), process all of the messages together and then ack them.