httpscatch-blockbenthos

Benthos yaml, catch http error, send to error topic


For Benthos yaml / pipeline what we developed, we will call http API, to handle the http exception, we can catch the error thru error(), then send to error topic, but it seems that the detailed http error msg can not be saved as meta variable or directly save to root. It always roll back to null at output code.

Here it is the sample code

http:
  address: 0.0.0.0:4190
  root_path: /benthos
input:
  kafka_franz:
    seed_broker:
      - seed-broker-server:15555
    consumer_group: "consumer_group1"
    topic: in_topic

pipeline:
  processors:
    - branch:
        processors:
          - mapping: |
              root = { "input_param1": "value1" }.key_values()
          - http:
              url: https://api.sampleserver.com/firstapi
              verb: POST
          - catch:
              - mapping:
                  # meta errorMsg = error() 
                  root = { "error": error() }
                  root = deleted()
         ......
         # other work flow code

output:
  switch:
    cases:
      - check: errored()
        output:
          kafka_franz:
            seed_brokers: out_broker_server:16666
            topic: "error_topic"
            key: $(! meta("kafka_key")
          processors:
            - log:
                level: INFO
                message: "Catch and send to error topic: $(! json() }"
    - output:
        .....
        # other code to out topic

Since the above catch error() did not work, anyone has experience or suggested solution to keep the error() code, and send it to error topic.


Solution

  • I'd need to see the entire config, but, since the error is happening inside a branch processor, then you likely want to use the result_map field of that processor to remap the data onto the origin message if you want to use catch which clears the error. Note that if you clear the error, then the errored() function returns false. Another issue there is that you're calling root = deleted() inside the branch processor which is problematic. Normally, the branch processor needs to preserve the number of messages it receives (one or more if you use batching). You can do root = deleted() outside of the branch but then that prevents the message from reaching the output.