javaapache-kafkaapache-kafka-streamsktable

Metadata for key is wrong even if the key is present in local Ktable in kafka streams when running two instances


I am facing a weird issue with when aggregating records into a Ktable. I have a following scenario in my system.

  1. There are two kafka streams application running on different nodes (having the same application id but having different application server config).

  2. Both of these streams listen to the same topic pattern where the records are partitioned by a key (string value).

  3. Whenever both the application is running , some partition are consumed by app-1 and some are consumed by app-2 which is normal. They then build their own respective local state store.

  4. I have a grapql query system which lets you query the key and get its value if its in local table or in another remote instance.

  5. The problem is that when I query for a key's metadata it is giving me the wrong hostInfo (even if the key is processed by instance one it shows it has hostInfo of instance two) But when if I query the key's value in instance-1's local state store I can see that the key is indeed present. (It just that the metadata for the key's is wrong)

  6. And this behaviour is random for key in both instance (some keys point the correct metadata while some don't)

  7. I have logged for a state listener which tells me if a rebalancing is happening or not. But while the records are streaming or when I am querying , I have make sure that no rebalancing is happening.

  8. The issue I face is something similar to this. Method of metadataForKey in Kafka Streams gives wrong information for multiple instances of application connected to the same group

  9. Also when I query for all keys in the local state store I can see the key is present.

Anyone have idea of what could be causing this issue? Please


Solution

  • Hello so the problem here was I was sending Kafka topic through my own custom logic for partitioning of records and wasn't using the default implementation that kafka provides. And on the streams side , it was calculating the metadata for the key using its default partitioning logic which resulted in wrong metadata. So , all I had to do is implement my own custom partitioner with the same logic I use over at the kafka side and use that logic to compute the metadata.