pythongoogle-cloud-platformgoogle-cloud-pubsubpublish-subscribepython-3.9

How to get all messages from GCloud Pubsub?


I am writing a scheduled cloud function, which subscribes to a pubsub topic and pulls all the messages from the pubsub until the queue is free. I want to ack some messages but nack the others so they remain in the queue will be pulled next time the CF runs.

This code keeps waiting for the message, it should stop if queue is empty.

def pull_messages():
    subscription_name = ""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path("", subscription_name)
    
    def callback(message):
        try:
            data = json.loads(message.data.decode('utf-8'))
            print(f"Received message: {data.get('order_id')}")
            print(f"Received message: {data.get('created_at')}")
            message.ack()
            logger.info("Message acknowledged")
        except Exception as ex:
            logger.error(f"Error processing message: {ex}")

    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
    logger.info(f"Listening for messages on {subscription_name}...\n")
    try:
        streaming_pull_future.result()
    except Exception as ex:
        streaming_pull_future.cancel()
        logger.error(f"Error in subscription: {ex}")

Solution

  • The notion of "pulling all messages until the queue is free" isn't considered a typical pattern for Cloud Pub/Sub. Usually, you want to keep your subscriber up and running all of the time. If you nack messages, then the queue is never going to be empty. Messages that are nacked are likely redelivered immediately or, if using retry policy, at the first moment the backoff time has passed.

    If you must shut down your subscribers and re-run them on a schedule, then the simplest way you get to a notion of "the queue is empty" is by keeping track of the last time a message was received in your callback handler and, if a sufficient amount of time passes, shutting down the subscriber. For example:

    import datetime
    import time
    from concurrent.futures import TimeoutError
    from google.cloud import pubsub_v1
    
    
    
    project_id = "<my-project>"
    subscription_id = "<my-subscription>"
    # Number of seconds after not receiving messages to shut down the subscriber
    timeout = 300.0
    # Number of seconds interval at which to check to see if messages have been
    # received
    check_interval = 10.0
    
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)
    last_received = datetime.datetime.now()
    def callback(message: pubsub_v1.subscriber.message.Message) -> None:
      global last_received
      print(f"Received {message}.")
      last_received = datetime.datetime.now()
      message.ack()
    
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
    print(f"Listening for messages on {subscription_path}..\n")
    
    
    with subscriber:
      while datetime.datetime.now() - last_received < datetime.timedelta(seconds=timeout):
        time.sleep(check_interval)
      streaming_pull_future.cancel()
      print("Stopping after no messages received.")
      streaming_pull_future.result()
    

    The amount of time to use depends on your sensitivity to the possibility of missing a message in the current run and it having to wait for the next run if not all messages managed to get delivered. If the number of messages you plan to nack is relatively small, you could possibly keep track of them and not reset last_received if you get a duplicate, though if you nack many messages, this may not be feasible.