I am using google's pub/sub to move processing of some tasks (represented by messages on a pub/sub topic) to the background. Some tasks are expected to fail periodically due to known transient errors from a internal service that enforces some measure/minute type limit. This measure is not known or correlated to tasks otherwise it would be much more straightforward to manage. These transient errors are retry-able. Google's documentation seems to indicate pull subscribers have more flow control than push subscribers.
How do I process such a queue while having both max throughput while being resilient to transient failures? Ex: I do not want to create my own bottleneck such as to process 1 task at a time vs. to process 100 tasks at once since I may know doing 1 task at a time will for the most part never trigger a transient error.
This is what I'm currently trying:
{A, B}
.T1
and attempt to process it.T1
returns error A
, I stop processing the queue of tasks since I know others will fail with the same timeout-based error. If task T1
returns an error B ∉ {A, B}
, send it somewhere else for manual triage. If task T1
succeeds, pull next task.T1
T1
after time-delay and if it works, continue processing the queueI think at step 2, I can optimize by pulling/dequeueing more tasks instead of 1 at a time. Should be possible to maintain a queue in the code and push tasks back on failure etc.
Also thinking of how multiple copies of such a microservice can maintain shared state such as when copies of the microservice M1, M2, M3
have been spawned and are running, if M1
receives a timeout-based error, M2
and M3
also stop processing since they share an internal service that limits as measure/minute and all three copies share a single access token.
For such specific use case I would suggest the following:
max_messages
parameter in pull
method of the queue.Here is some example of how consumer code might look like (mostly pseudocode to explain the idea):
import redis
import time
from google.cloud import pubsub_v1
error_backoff_seconds = 60
rate_limit_cooldown_seconds = 5
messages_per_batch = 10
cache = redis.Redis(host='localhost', port=6379)
def process_task(task):
try:
# TODO: call task processing code here
pass
except TransientError as e:
cache.set('rate_limit', 'hit', ex=error_backoff_seconds) # ← set up a back off
raise e
except Exception as e:
pass
def callback(message):
if cache.get('rate_limit'):
time.sleep(rate_limit_cooldown_seconds) # ← delay execution after error
continue
try:
process_task(message)
message.ack()
except TransientError:
time.sleep(10) # ← You might want to make it exponential
continue
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
flow_control = pubsub_v1.types.FlowControl(max_messages=messages_per_batch)
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback, flow_control=flow_control
)
with subscriber:
try:
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
streaming_pull_future.result()