etlpipelinedata-pipelinebenthos

How can we generate multiple output file in benthos?


Input Data:

{ "name": "Coffee", "price": "$1.00" } 
{ "name": "Tea", "price": "$2.00" } 
{ "name": "Coke", "price": "$3.00" } 
{ "name": "Water", "price": "$4.00" }

extension.yaml

input:
  label: ""
  file:
    paths: [./input/*]
    codec: lines
    max_buffer: 1000000
    delete_on_finish: false
pipeline:
  processors:  
    - bloblang: |
        root.name = this.name.uppercase()
        root.price = this.price
output:
  file:
    path: "result/file1.log"
    codec: lines

I want to create an output file based on the input and defined processor for our need.


Solution

  • extension.yaml

    input:
      label: ""
      file:
        paths: [./input/*]
        codec: lines
        max_buffer: 1000000
        delete_on_finish: false
    pipeline:
      processors:  
        - bloblang: |
            root.name = this.name.uppercase()
            root.price = this.price
    output:
      broker:
        pattern: fan_out
        outputs:
        - file:
            path: "result/file1.log"
            codec: lines
        - file:
            path: "result/file2.log"
            codec: lines
          processors:
            - bloblang: |
                root.name = this.name.lowercase()
    

    It will generate two output files "file1.log" & file2.log".

    file1.log

    {"name":"COFFEE","price":"$1.00"}
    {"name":"TEA","price":"$2.00"}
    {"name":"COKE","price":"$3.00"}
    {"name":"WATER","price":"$4.00"}
    

    file2.log

    {"name":"coffee"}
    {"name":"tea"}
    {"name":"coke"}
    {"name":"water"}
    

    Here we can use something called 'Broker' in Benthos as shown in the above example.

    It allows you to route messages to multiple child outputs using a range of brokering patterns.

    You can refer to this link for more information: Broker