apache-kafka-streams

Kafka Streams KTable-KTable FK join on partitioned topics - partition routing issue?


Given two entities Agency and Configuration:

class Agency {
  Long id; // PK
  UUID configurationId; // FK -> Configuration.id
  // ...
}
class Configuration {
  UUID id; // PK
  // ...
}

The respective topics are spread over multiple partitions.

Now assume the following topology defining a KTable-KTable FK left join from Agency.configurationId into Configuration.id:

    @Produces
    public Topology buildTopology() {

        final StreamsBuilder streamsBuilder = new StreamsBuilder();

        try (

                final Serde<Long> longKeySerde = AggregatorSerdes.debeziumKeySerdeFromFieldId(Long.class); // This is basically a JSON Serde reading from a specific property
                final Serde<UUID> uuidKeySerde = AggregatorSerdes.debeziumKeySerdeFromFieldId(UUID.class); // Same here, but for UUID

                final Serde<Agency> agencySerde = AggregatorSerdes.debeziumValueSerdeFromFieldAfter(Agency.class); //
                final Serde<Configuration> configurationSerde = AggregatorSerdes.debeziumValueSerdeFromFieldAfter(
                        Configuration.class); //

                final Serde<AggregateAgency> aggregateAgencySerde = AggregatorSerdes.debeziumSerdeWithoutConfiguration(AggregateAgency.class) // This is a plain JSON Serde

        ) {

            final KTable<Long, Configuration> configurations = streamsBuilder
                    .table( //
                            configurationTopicName, //
                            Consumed.with( //
                                    longKeySerde, //
                                    configurationSerde));

            streamsBuilder //
                    .table( //
                            agencyTopicName, //
                            Consumed.with( //
                                    longKeySerde, //
                                    agencySerde)) //
                    .leftJoin( //
                            configurations, //
                            Agency::configurationId, //
                            AggregateAgency::new, //
                            TableJoined.as("agency-to-configuration")) //
                    .toStream(Named.as("agency-to-configuration-tostream")) //
                    .to(aggregateTopic, //
                            Produced.<Long, AggregateAgency>as("agency-to-configuration-producer") //
                                    .withKeySerde(longKeySerde) //
                                    .withValueSerde(aggregateAgencySerde));

        }

        return streamsBuilder.build(properties); // Defines topology.optimizations=all

    }

This topology does not render expected results as very often even though existing in the source topic the referred Configuration is not present in the respective aggregate object. Is this a result of the partitioned source topics, or rather a misunderstanding / wrong implementation of FK joins?


Additional Observations:

As per Kafka UI, an example Agency with ID 0 is located in partition 4, while its referenced Configuration is located in partition 8. The Repartitioning mechanism of FK joins should co-locate that Agency with a new composite key to partition 8 as well in order to allow for the join operation. However, looking for the FK in the repartition topic reveals that it lands in partition 0, thus effectively ruling out the join.

Could this be based on a difference in the partition routing mechanism used by Kafka Streams in contrast to that of the initial producer of the input topics? These are created using Debezium.


Solution

  • Not sure if I fully understand what you mean by

    The Repartitioning mechanism of FK joins should co-locate that Agency with a new composite key to partition 8 as well in order to allow for the join operation.

    Are you referring to the input topics? Input topics would not be co-partitioned. Agency-0 is the PK of the agency, and it's totally ok if the Agency is in a different input topic partitions compared to the corresponding configuration.

    FK joins are implemented using internal "subscription" and "response" topics. The "subscription" topic would be co-partitioned with the right hand (configuration) table topic, and the configuration-id would be used to write into this topic. Thus, Agency-0 with Configuration-8 (which is in input topic partition-0), should write into "subscription" partition-8 to co-locate with Configuration-8 from the right table. -- If a join match is found, a "response" is sent back to the left hand side via the "response" topic, which is co-partitioned with the left input table (agency), and thus a join match would we written into "response" partition-0 to map it back to the original input agency.

    Cf https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/ for more details.

    For the hashing into internal subscription and response topics, Kafka Streams uses the default murmur2 hashing. How is your input data partitioned? If it's partitioned differently, you would need to provide a custom partitioner into the join (supported since 3.1 release by passing a TableJoined object into the join(...)).

    Another know issue is the usage of schema-registry key-formats. The join is computed on serialized bytes, and thus, if the schema-id (which is part of the serialized key) does not match, the join might fail even if the actually data is the same.