.netapache-kafkaconfluent-kafka-dotnet

Confluent Kafka: How to replicate partitioner hashing function so that it's possible for the consumer to consume the correct partition?


I have set up a Kafka server with 1000 partitions and 1 Topic. Basically, the data that gets stored in the cluster are status updates of objects. There are about 17000 of these objects and I produce maybe 5 status updates per second. Every object has an uuid as unique identifier. That uuid is also used as the message key for the status updates. This is done so that the partitioner groups messages with the same key together in the same partition.

I also have a ReactJs frontend that shows those objects and its current status on a map. The communication between the ReactJs frontend and the Kafka server is done through a dotnet 7.0 service which functions as a consumer client. The Kafka client I'm using is Confluent.Kafka version 2.1.1 The reactJs frontend and consumer client are connected through websocket. Every 15 seconds, the objects that are 'inBound' on the map get send to the consumer client through the websocket (the uuids get send).

I then consume the topic and use the uuid I got from the frontend to match it with the latest status update message in the topic. But I figured, since I have thousands of records that it would be quite slow. So I thought "wouldn't it be better if I could just consume the partition status updates with that uuid as message key are in?". So I thought why not encode the uuid the same way the partitioner does, so that I can get the correct partitionId and assign that to the consumer?

After some googling I found out that the default partitioner uses the Murmur2 hash of the key and then does % the amount of partitions (murmur2hash(key) % number_of_partitions). I tried to recreate that, first by writing a Murmur2 hashing function:

  public static class MurmurHash2
    {
        public static uint Hash(byte[] data)
        {
            const uint m = 0x5bd1e995;
            const int r = 24;
            var seed = 0x9747b28c ^ (uint)data.Length;
            var length = data.Length;
            var currentIndex = 0;

            uint h = seed ^ (uint)length;

            while (length >= 4)
            {
                uint k = BitConverter.ToUInt32(data, currentIndex);

                k *= m;
                k ^= k >> r;
                k *= m;

                h *= m;
                h ^= k;

                currentIndex += 4;
                length -= 4;
            }

            switch (length)
            {
                case 3:
                    h ^= (ushort)(data[currentIndex++] | data[currentIndex++] << 8);
                    h ^= (uint)(data[currentIndex] << 16);
                    h *= m;
                    break;
                case 2:
                    h ^= (ushort)(data[currentIndex++] | data[currentIndex] << 8);
                    h *= m;
                    break;
                case 1:
                    h ^= data[currentIndex];
                    h *= m;
                    break;
            }


            h ^= h >> 13;
            h *= m;
            h ^= h >> 15;

            return h;
        }
    }

and then to call it after I have produced the message to check if my code results in the same partitionId as the actual partitionId:

public class Producer {
        private string _topicName;
        private int _partitionCount;
        private IProducer<string, string> _producer;
        private ProducerConfig _config;
      
        public Producer(ProducerConfig config, string topicName) {
            this._topicName = topicName;
            this._partitionCount = 1000;
            this._config = config;
            _config.ApiVersionRequest = false;
            this._producer = new ProducerBuilder<string, string>(this._config).Build();
        }

        public async Task writeMessage(string uuid, string msiData)
        {
           
            var dr = await _producer.ProduceAsync(this._topicName, new Message<string, string>()
            {
                Key = uuid,
                Value = msiData
            });
            Console.WriteLine(uuid);
            Console.WriteLine("Actual partitionId: " + dr.Partition);
            Console.WriteLine("My partition id: " + GetPartitionForKey(uuid, 1000));
            return;
        }

        public static int GetPartitionForKey(string key, int numPartitions) {
            byte[] keyBytes = Encoding.UTF8.GetBytes(key);
            uint hash = MurmurHash2.Hash(keyBytes);
            int partition = (int)(hash % (uint)numPartitions);
            return partition;
        }
    }
}

But in the logs I find that my generated partition Id and the partitionId from the produce result are different.

So my question is, what am I doing wrong and how can I achieve the desired behaviour?


Solution

  • I found the answer. You don't have to mess around with custom partitioners.

    With Confluent Kafka, you can create a TopicPartition object and pass it as an argument to the ProduceAsync method.

    var message = new Message<string, string>() {
       Key = "<uuid>",
       Value = "<some-value>",
    };
    
    var TopicPartition = new TopicPartition("my-topic", 1);
    var dr = await _producer.ProduceAsync(TopicPartition, message);
    
    

    The TopicPartition object takes two arguments: a topic name and a topic number. Instead of the topic number you can call a function with your own logic.

    Then, in your consumer, you can use that same logic and assign the result of that logic to the consumer.