goapache-kafkaconfluent-schema-registrybenthos

How can I read and decode AVRO messages from Kafka along with their associated kafka key using Benthos?


I am using Benthos to read AVRO-encoded messages from Kafka which have the kafka_key metadata field set to also contain an AVRO-encoded payload. The schemas of these AVRO-encoded payloads are stored in Schema Registry and Benthos has a schema_registry_decode processor for decoding them. I'm looking to produce an output JSON message for each Kafka message containing two fields, one called content containing the decoded AVRO message and the other one called metadata containing the various metadata fields collected by Benthos including the decoded kafka_key payload.


Solution

  • It turns out that one can achieve this using a branch processor like so:

    input:
      kafka:
        addresses:
          - localhost:9092
        consumer_group: benthos_consumer_group
        topics:
          - benthos_input
    
    pipeline:
      processors:
        # Decode the message
        - schema_registry_decode:
            url: http://localhost:8081
    
        # Populate output content field
        - bloblang: |
            root.content = this
    
        # Decode kafka_key metadata payload and populate output metadata field
        - branch:
            request_map: |
              root = meta("kafka_key")
    
            processors:
              - schema_registry_decode:
                  url: http://localhost:8081
    
            result_map: |
              root.metadata = meta()
              root.metadata.kafka_key = this
    
    output:
      stdout: {}