apache-kafkacamundazeebe

Unable to set topic with camelCase name via Env variable


I'm deploying zeebe using helm. With extraInitContainers directive I manage to include the kafka-exporter 3.1.1 and it loads correctly. In the yml file I set a series of env variable, here below you can see an extract:

env:
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_PRODUCER_SERVERS
      value: acme-infrastructure-kafka:9092
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_PRODUCER_CONFIG
      value: "linger.ms=5\nbuffer.memory=8388608\nbatch.size=32768\nmax.block.ms=5000"
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_CLASSNAME
      value: io.zeebe.exporters.kafka.KafkaExporter
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_JARPATH
      value: /usr/local/zeebe/exporters/zeebe-kafka-exporter.jar
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_DEFAULTS_TYPE
      value: ""
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_DEFAULTS_TOPIC
      value: "zeebe"
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_JOB_TYPE
      value: "event"
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_JOB_TOPIC
      value: "zeebe-job"
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_PROCESSINSTANCE_TYPE
      value: "event"
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_PROCESSINSTANCE_TOPIC
      value: "zeebe-process-instance"

With this configuration I don't receive any zeebe topic records but ony topic for zeebe-job and zeebe-process-instance with type EVENT, as the documentation explains. But I get only job works, and I think the process instance topic doesn't work because it wants this: processInstance in the json config file but it gets this processinstance. From zeebe logs, once booting I can see the configuration, and the topic is all lowercase:

io.camunda.zeebe.broker.exporter - Configured Kafka exporter: Config{producer=ProducerConfig{clientId='zeebe', closeTimeout=PT20S, config={batch.size=32768, max.block.ms=5000, buffer.memory=8388608, linger.ms=5}, requestTimeout=PT5S, maxBlockingTimeout=PT2S, servers=[datamanent-infrastructure-kafka:9092]}, records=RecordsConfig{typeMap={JOB=RecordConfig{allowedTypes=[EVENT], topic='zeebe-job'}, INCIDENT=RecordConfig{allowedTypes=[EVENT], topic='zeebe-incident'}, ERROR=RecordConfig{allowedTypes=[EVENT], topic='zeebe-error'}, PROCESS=RecordConfig{allowedTypes=[EVENT], topic='zeebe-process'}}, defaults=RecordConfig{allowedTypes=[], topic='zeebe'}}, maxBatchSize=100, commitInterval=PT1S}
2022-03-21 14:16:29.038 [] [main] INFO
      io.camunda.zeebe.broker.system - Version: 1.3.4
2022-03-21 14:16:29.068 [] [main] INFO
      io.camunda.zeebe.broker.system - Starting broker 0 with configuration {

 "kafka" : {
      "jarPath" : "/usr/local/zeebe/exporters/zeebe-kafka-exporter.jar",
      "className" : "io.zeebe.exporters.kafka.KafkaExporter",
      "args" : {
        "maxbatchsize" : "100",
        "producer" : {
          "clientid" : "zeebe",
          "servers" : "acme-infrastructure-kafka:9092",
          "config" : "linger.ms=5\nbuffer.memory=8388608\nbatch.size=32768\nmax.block.ms=5000"
        },
        "records" : {
          "job" : {
            "topic" : "zeebe-job",
            "type" : "event"
          },
          "process" : {
            "type" : "event",
            "topic" : "zeebe-process"
          },
          "defaults" : {
            "type" : "",
            "topic" : "zeebe"
          },
          "error" : {
            "topic" : "zeebe-error",
            "type" : "event"
          },
          "incident" : {
            "type" : "event",
            "topic" : "zeebe-incident"
          },
          "processinstance" : {
            "topic" : "zeebe-process-instance",
            "type" : "event"
          }
        },
        "maxblockingtimeoutms" : "1000",
        "flushintervalms" : "1000"
      },
      "external" : true
    }

Here the link of the community project: https://github.com/camunda-community-hub/zeebe-kafka-exporter


Solution

  • This is a known limitation https://github.com/camunda/zeebe/issues/4724 we hope we can fix this with use of Spring's ApplicationContext to instantiate exporter arguments see https://github.com/camunda/zeebe/issues/7628

    As a workaround to configure this properly either use the JAVA_OPTIONS environment variable:

    So e.g write ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_PROCESSINSTANCE_TYPE=event as JAVA_OPTS="${JAVA_OPTS} -Dzeebe.broker.exporters.kafka.args.records.processInstance.type=event

    Or you do it via the application.yaml https://github.com/camunda/zeebe/blob/main/dist/src/main/config/broker.standalone.yaml.template which you could mount from a configmap to the container.