node.jsgoogle-cloud-platformarchitecturemicroservicesgoogle-cloud-pubsub

How to design an efficient Pub/Sub pull subscriber for both maximum throughput and fault-tolerance?


I am using google's pub/sub to move processing of some tasks (represented by messages on a pub/sub topic) to the background. Some tasks are expected to fail periodically due to known transient errors from a internal service that enforces some measure/minute type limit. This measure is not known or correlated to tasks otherwise it would be much more straightforward to manage. These transient errors are retry-able. Google's documentation seems to indicate pull subscribers have more flow control than push subscribers.

How do I process such a queue while having both max throughput while being resilient to transient failures? Ex: I do not want to create my own bottleneck such as to process 1 task at a time vs. to process 100 tasks at once since I may know doing 1 task at a time will for the most part never trigger a transient error.

This is what I'm currently trying:

  1. List known transient errors let's say {A, B}.
  2. Pull next task T1 and attempt to process it.
  3. If task T1 returns error A, I stop processing the queue of tasks since I know others will fail with the same timeout-based error. If task T1 returns an error B ∉ {A, B}, send it somewhere else for manual triage. If task T1 succeeds, pull next task.
  4. Force a time-delay on task T1
  5. Try to process T1 after time-delay and if it works, continue processing the queue

I think at step 2, I can optimize by pulling/dequeueing more tasks instead of 1 at a time. Should be possible to maintain a queue in the code and push tasks back on failure etc.

Also thinking of how multiple copies of such a microservice can maintain shared state such as when copies of the microservice M1, M2, M3 have been spawned and are running, if M1 receives a timeout-based error, M2 and M3 also stop processing since they share an internal service that limits as measure/minute and all three copies share a single access token.


Solution

  • For such specific use case I would suggest the following:

    1. Setup a Redis instance (or cluster) to maintain shared storage. You might want to use Redis for shared locks as well.
    2. Use batch processing for higher bandwidth and better control over processing. You can achieve this by using max_messages parameter in pull method of the queue.
    3. Finally, setting up a Dead Letter Queue is the highly recommended if you want to triage/retry failed tasks later.

    Here is some example of how consumer code might look like (mostly pseudocode to explain the idea):

    import redis
    import time
    from google.cloud import pubsub_v1
    
    error_backoff_seconds = 60
    rate_limit_cooldown_seconds = 5
    messages_per_batch = 10
    cache = redis.Redis(host='localhost', port=6379)
    
    
    def process_task(task):
        try:
            # TODO: call task processing code here
            pass
        except TransientError as e:
            cache.set('rate_limit', 'hit', ex=error_backoff_seconds)  # ← set up a back off
            raise e
        except Exception as e:
            pass
    
    
    def callback(message):
        if cache.get('rate_limit'):
            time.sleep(rate_limit_cooldown_seconds)  # ← delay execution after error
            continue
    
        try:
            process_task(message)
            message.ack()
        except TransientError:
            time.sleep(10)  # ← You might want to make it exponential
            continue
    
    
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)
    flow_control = pubsub_v1.types.FlowControl(max_messages=messages_per_batch)
    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback, flow_control=flow_control
    )
    
    with subscriber:
        try:
            streaming_pull_future.result(timeout=timeout)
        except TimeoutError:
            streaming_pull_future.cancel()
            streaming_pull_future.result()