pythongoogle-cloud-platformpublish-subscribegoogle-cloud-pubsubpypubsub

Pub\Sub Python Client - Gracefully shutdown subscriber


I am using Google Pub/Sub client v2.2.0 in python3.6 as a Subscriber.

I want my application to shutdown gracefully after acking all the messages it already received.

Sample code for a subscriber from Google's guide with minor changes that will show my issue:

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from time import sleep

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print(f"Received {message}.")
    sleep(30)
    message.ack()
    print("Acked")

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    sleep(10)
    streaming_pull_future.cancel()
    streaming_pull_future.result()

From https://cloud.google.com/pubsub/docs/pull

I expect this code to stop pulling messages and finish the running messages and then exits.

Actually this code stops pulling messages and finish executing the running messages but it does not ack the messages. The .ack() happens but the server does not receive the ack, so next run the same messages return again.

1. Why doesn't the server receives the ack?

2. How to gracefully shutdown the subscriber?

3. What is the expected behavior of .cancel()?


Solution

  • Update (v2.4.0+)

    The client version 2.4.0 added a new optional parameter await_msg_callbacks to the streaming pull future's cancel() method. If set to True, the method will block until all currently executing message callbacks are done and the background message stream has been shut down (the default is False).

    try:
        streaming_pull_future.result()
    except KeyboardInterrupt:
        streaming_pull_future.cancel(await_msg_callbacks=True)  # blocks until done
    

    A couple of release notes:

    Original answer (v2.3.0 and below)

    The streaming pull is managed in the background by a streaming pull manager. When the streaming pull future is canceled, it invokes the manager's close() method that gracefully shuts down the background helper threads.

    One of the things that are shut down is the scheduler - it's a thread pool that is used to asynchronously dispatch received messages to the user callback. The key thing to note is that scheduler.shutdown() does not wait for the user callbacks to complete, as it could potentially block "forever", but instead empties the executor's work queue and shuts the latter down:

    def shutdown(self):
        """Shuts down the scheduler and immediately end all pending callbacks.
        """
        # Drop all pending item from the executor. Without this, the executor
        # will block until all pending items are complete, which is
        # undesirable.
        try:
            while True:
                self._executor._work_queue.get(block=False)
        except queue.Empty:
            pass
        self._executor.shutdown()
    

    This explains why ACKs are not sent in the provided code sample - the callbacks sleep for 30 seconds, while the streaming pull future is canceled only after approximately 10 seconds. The ACKs are not sent to the server.

    Misc. remarks

    try:
        streaming_pull_future.result()
    except KeyboardInterrupt:
        streaming_pull_future.cancel()
    

    Or after a pre-set timeout:

    try:
        streaming_pull_future.result(timeout=123)
    except concurrent.futures.TimeoutError:
        streaming_pull_future.cancel()