apache-nifiapache-minifi

Kafka Processor does not keep the state of attributes of flowfile


I update few attributes of the flowfile and put the same in kafka but when I consume the same from consumekafka_2.0 processor that attributes are lost. Is this not supported ? Do I need to customise this processor?

When I saw the below source code of the processor then I got that it is already reading the attributes from record and writing the same in flowfile then Why these are not available in the flowfile?

private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
        FlowFile flowFile = session.create();
        final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
        tracker.incrementRecordCount(1);
        final byte[] value = record.value();
        if (value != null) {
            flowFile = session.write(flowFile, out -> {
                out.write(value);
            });
        }
        flowFile = session.putAllAttributes(flowFile, getAttributes(record));
        tracker.updateFlowFile(flowFile);
        populateAttributes(tracker);
        session.transfer(tracker.flowFile, REL_SUCCESS);
    }

Solution

  • In order to pass attributes you must make use of Kafka headers, otherwise there is no way to pass the attributes across since they are not part of the body of the flow file which is what will become the body of the message in Kafka.

    On the publish side, PublishKafka_2_0 has the following property to specify which attributes to send as headers:

    static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
            .name("attribute-name-regex")
            .displayName("Attributes to Send as Headers (Regex)")
            .description("A Regular Expression that is matched against all FlowFile attribute names. "
                + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
                + "If not specified, no FlowFile attributes will be added as headers.")
            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
            .required(false)
            .build();
    

    On the consume side, ConsumeKafka_2_0 has the following property to specify which header fields to add as attributes:

    static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder()
            .name("header-name-regex")
            .displayName("Headers to Add as Attributes (Regex)")
            .description("A Regular Expression that is matched against all message headers. "
                + "Any message header whose name matches the regex will be added to the FlowFile as an Attribute. "
                + "If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by "
                + "the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like "
                + "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
                + "the messages together efficiently.")
            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
            .required(false)
            .build();