javaapache-kafkaapache-kafka-streams

Kafka-streams filter messages using headers


We're trying to use kafka streams for our project to read data from one topic and write to another, and we have a use case to use KafkaHeaders as a mechanism to filter our certain records.

Example, in input topic, you get data of all students belonging to a school. And now in output topic, you only want information about a subset of students based on their class.

Record:

student_name | student_id | student_class

Initially we thought we will use the student object to do this, but that means that we need to deserialize the object and then do the filtering. Instead of that, what we want to do is pass headers with each record. This header will have class information of the students.

Header:

class: v

We were wondering if there's a way to do this using kafka streams. We thought we can use the header in filter function, but filter function doesn't have header information.

    kstreams.filter((k,v) -> {
       howToAccessHeaders?
})

We also tried using process function, but again, it's not clear to use how to filter out the record there.

kstream.process(new CustomerProcessor())

CustomProcessor:

class CustomProcessor implements Processor<String, byte[], String, byte[]> {

    @Override
    public void process(Record<String, byte[]> record) {
        if(record.headers().lastHeader("class").value().toString() == "v"){
            //Without return value, how does the record gets filtered? 
        }
    }
}

Is there something else that we can do to filter the records using the headers? Or is that not possible using kafkaStreams?

PS: We tried using the tranform and transformValues functions, but they are now deprecated.


Solution

  • Writing a custom Processor is the right way to go. To forward records downstream, ie, the records you want to keep, you would use ProcessorContext#forward(...). The corresponding ProcessorContext object is passed into the Processor via the Processor#init(...) method you need to overwrite. -- If you don't call forward() on a input record, it's dropped on the floor and filtered out.

    Cf https://docs.confluent.io/platform/current/streams/developer-guide/processor-api.html#accessing-processor-context