confluent-kafka-dotnet

Confluent Batch Consumer. Consumer not working if Time out is specified


I am trying to consume a max of 1000 messages from kafka at a time. (I am doing this because i need to batch insert into MSSQL.) I was under the impression that kafka keeps an internal queue which fetches messages from the brokers and when i use the consumer.consume() method it just checks if there are any messages in the internal queue and returns if it finds something. otherwise it just blocks until the internal queue is updated or until timeout.

I tried to use the solution suggested here: https://github.com/confluentinc/confluent-kafka-dotnet/issues/1164#issuecomment-610308425

but when i specify TimeSpan.Zero (or any other timespan up to 1000ms) the consumer never consumes any messages. but if i remove the timeout it does consume messages but then i am unable to exit the loop if there are no more messages left to be read.

I also saw an other question on stackoverflow which suggested to read the offset of the last message sent to kafka and then read messages until i reach that offset and then break from the loop. but currently i only have one consumer and 6 partitions for a topic. I haven't tried it yet but i think managing offsets for each of the partition might make the code messy.

Can someone please tell me what to do?

static List<RealTime> getBatch()
    {

        var config = new ConsumerConfig
        {
            BootstrapServers = ConfigurationManager.AppSettings["BootstrapServers"],
            GroupId = ConfigurationManager.AppSettings["ConsumerGroupID"],
            AutoOffsetReset = AutoOffsetReset.Earliest,
        };

        List<RealTime> results = new List<RealTime>();
        List<string> malformedJson = new List<string>();

        using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            consumer.Subscribe("RealTimeTopic");
            int count = 0;

            while (count < batchSize)
            {
                var consumerResult = consumer.Consume(1000);
                
                if (consumerResult?.Message is null)
                {
                    break;
                }
                Console.WriteLine("read");
                try
                {
                    RealTime item = JsonSerializer.Deserialize<RealTime>(consumerResult.Message.Value);
                    results.Add(item);
                    count += 1;
                }
                catch(Exception e)
                {
                    Console.WriteLine("malformed");
                    malformedJson.Add(consumerResult.Message.Value);
                }
                
                
            }
            
            consumer.Close();

        };
        Console.WriteLine(malformedJson.Count);

        return results;
    }

Solution

  • I found a workaround. For some reason the consumer first needs to be called without a timeout. That means it will wait for a message until it gets at least one. after that using consume with timeout zero fetches all the rest of the messages one by one from the internal queue. this seems to work out for the best.