How does Benthos handle the acknowledgement of pubsub messages? How can we manage ack/unack based on custom if-else conditions?
Here is the scenario i'm trying to achieve: I have written a Benthos job that pulls messages from a Pub/Sub subscriber, calls two APIs using that data, and sends Slack alerts for each failure. Finally, it leaves the message unacknowledged if any error is encountered while calling the two APIs.
The problem is that even after explicitly throwing an error using throw("Intentional error")
, this job still acknowledges the pulled message. Ideally, it should leave the message unacknowledged so that Pub/Sub's redelivery handles the retry. Am I missing something here?
PS: I had to throw error explicitly because 2nd API(event_service) sends event related to the 1st API(update_api_call) status.
Here is my current config which is giving this issue:
input:
gcp_pubsub:
project: project-name
subscription: subscription-name
sync: true
pipeline:
threads: 0
processors:
- branch:
request_map: |
root = {
"A": this.A.number(),
}
processors:
- try:
- resource: update_api_call
- catch:
- mapping: |
root = {
"text": "@here Alert message update_api_call failure"
}
- resource: slack_service
- mapping: 'root = {"error": error()}'
result_map: |
root.responses.update_api_call = {
"response": if this.error == null { this } else { null },
"error": if this.error != null { this.error } else { meta("error") }
}
- branch:
request_map: |
root = {
"i": uuid_v4(),
"data": {
"event": "event_name",
"properties": {
"A": this.A,
"status": if this.responses.update_api_call.error == null { true } else { false },
"failureReason": this.responses.update_api_call.error
}
}
}
processors:
- try:
- resource: event_service
- catch:
- mapping: |
root = {
"text": "@here Alert message send event failure",
}
- resource: slack_service
- mapping: 'root = {"error": error()}'
result_map: |
root.responses.event_service = {
"response": if this.error == null { this } else { null },
"error": if this.error != null { this.error } else { meta("error") }
}
# New processor to fail the message if either branch recorded an error.
- bloblang: |
if this.responses.update_api_call.error != null || this.responses.event_service.error != null {
throw("Intentional error") # for leaving message unacked
}
output:
label: responses
stdout:
codec: lines
You'll have to use the reject
or the reject_errored
output explicitly to get the behaviour you're looking for. You can leverage the switch
output for this purpose. The throw
function just sets the error flag on the message, but the output will still process it. Also, if you use the delete()
function, the message will be acked instead of being rejected (nacked).