javaapache-kafkaapache-kafka-streamskafka-join

Kafka Streams join by key with complex condition


I'm trying to join KStream with GlobalKTable by key, but with specific logic.

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, Integer> stream = builder.stream(inputTopic1); // key = "ABC"
    GlobalKTable<String, Integer> table = builder.globalTable(inputTopic2); // key = "ABC"

    stream.join(table, // join first by "ABC" = "ABC", then by "AB" = "AB", then by "A" = "A"
            (key, value) -> key,
            (valueLeft, valueRigth) -> {/* identify by which condition the join was performed */});

For example, if the key = "ABC", then:

Additionally, it is required to know by which condition was the join performed - e.g., by 3 letters / by 2 letters / by 1 letter.

The question is, is it possible at all or should I search for a workaround? For example, make copies of GlobalKTable with corresponding keys (table with "ABC" key, one with "AB" key and one with "A" key) and perform 3 separate joins? Or maybe any other suggestions?

Thanks in advance!


Solution

  • Using a series of left-joins against multiple tables would be possible (if you know how often you want to try the join). If the joins succeeds, you skip the next join. Using a combination of leftJoin() and branch() should allow you split the stream after each join into "joined" and "retry". At the end, you can merge() the different result streams together if you want.