For Benthos yaml / pipeline what we developed, we will call http API, to handle the http exception, we can catch the error thru error(), then send to error topic, but it seems that the detailed http error msg can not be saved as meta variable or directly save to root. It always roll back to null at output code.
Here it is the sample code
http:
address: 0.0.0.0:4190
root_path: /benthos
input:
kafka_franz:
seed_broker:
- seed-broker-server:15555
consumer_group: "consumer_group1"
topic: in_topic
pipeline:
processors:
- branch:
processors:
- mapping: |
root = { "input_param1": "value1" }.key_values()
- http:
url: https://api.sampleserver.com/firstapi
verb: POST
- catch:
- mapping:
# meta errorMsg = error()
root = { "error": error() }
root = deleted()
......
# other work flow code
output:
switch:
cases:
- check: errored()
output:
kafka_franz:
seed_brokers: out_broker_server:16666
topic: "error_topic"
key: $(! meta("kafka_key")
processors:
- log:
level: INFO
message: "Catch and send to error topic: $(! json() }"
- output:
.....
# other code to out topic
Since the above catch error() did not work, anyone has experience or suggested solution to keep the error() code, and send it to error topic.
I'd need to see the entire config, but, since the error is happening inside a branch
processor, then you likely want to use the result_map
field of that processor to remap the data onto the origin message if you want to use catch
which clears the error. Note that if you clear the error, then the errored()
function returns false
. Another issue there is that you're calling root = deleted()
inside the branch
processor which is problematic. Normally, the branch
processor needs to preserve the number of messages it receives (one or more if you use batching). You can do root = deleted()
outside of the branch
but then that prevents the message from reaching the output.