etlgoogle-cloud-dataflowgoogle-cloud-pubsubbenthosredpanda-connect

Leaving message unacknowledged in Benthos job with gcp_pubsub input


How does Benthos handle the acknowledgement of pubsub messages? How can we manage ack/unack based on custom if-else conditions?

Here is the scenario i'm trying to achieve: I have written a Benthos job that pulls messages from a Pub/Sub subscriber, calls two APIs using that data, and sends Slack alerts for each failure. Finally, it leaves the message unacknowledged if any error is encountered while calling the two APIs.

The problem is that even after explicitly throwing an error using throw("Intentional error"), this job still acknowledges the pulled message. Ideally, it should leave the message unacknowledged so that Pub/Sub's redelivery handles the retry. Am I missing something here?

PS: I had to throw error explicitly because 2nd API(event_service) sends event related to the 1st API(update_api_call) status.

Here is my current config which is giving this issue:

input:
  gcp_pubsub:
    project: project-name
    subscription: subscription-name
    sync: true

pipeline:
  threads: 0
  processors:
    - branch:
        request_map: |
          root = {
            "A": this.A.number(),
          }
        processors:
          - try:
              - resource: update_api_call
          - catch:
              - mapping: |
                  root = {
                    "text": "@here Alert message update_api_call failure"
                  }
              - resource: slack_service
              - mapping: 'root = {"error": error()}'
        result_map: |
          root.responses.update_api_call = {
            "response": if this.error == null { this } else { null },
            "error": if this.error != null { this.error } else { meta("error") }
          }

    - branch:
        request_map: |
          root = {
            "i": uuid_v4(),
            "data": {
              "event": "event_name",
              "properties": {
                "A": this.A,
                "status": if this.responses.update_api_call.error == null { true } else { false },
                "failureReason": this.responses.update_api_call.error
              }
            }
          }
        processors:
          - try:
              - resource: event_service
          - catch:
              - mapping: |
                  root = {
                    "text": "@here Alert message send event failure",
                  }
              - resource: slack_service
              - mapping: 'root = {"error": error()}'
        result_map: |
          root.responses.event_service = {
            "response": if this.error == null { this } else { null },
            "error": if this.error != null { this.error } else { meta("error") }
          }

    # New processor to fail the message if either branch recorded an error.
    - bloblang: |
        if this.responses.update_api_call.error != null || this.responses.event_service.error != null {
          throw("Intentional error") # for leaving message unacked
        }

output:
  label: responses
  stdout:
    codec: lines

Solution

  • You'll have to use the reject or the reject_errored output explicitly to get the behaviour you're looking for. You can leverage the switch output for this purpose. The throw function just sets the error flag on the message, but the output will still process it. Also, if you use the delete() function, the message will be acked instead of being rejected (nacked).