spring-bootspring-kafkakafka-consumer-apispring-boot-3

Spring Boot 3 + Kafka not joining/fetching records in the same poll timeout as on SB 2.7


So I have updated my application to Spring Boot 3, specifically 3.1.6 but this problem is present on all 3.x versions I've tried. Basically the old poll times are not enough to receive the records anymore, and it appears that for some reason rebalancing takes longer (or something?).

Basically, I can see it in manifested in 2 ways in 3 different services:

2023-09-04 13:20:10 [2023-09-04 10:20:10,145] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group webstore-monitoring-service in Empty state. Created a new member id consumer-webstore-monitoring-service-1-86502ee0-eddf-445b-81c1-263b4d4e997a and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:10 [2023-09-04 10:20:10,147] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group webstore-monitoring-service in Empty state. Created a new member id consumer-webstore-monitoring-service-2-316b3fc5-30b1-49fa-ad80-c5d506ede709 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:10 [2023-09-04 10:20:10,152] INFO [GroupCoordinator 1]: Preparing to rebalance group webstore-monitoring-service in state PreparingRebalance with old generation 0 (__consumer_offsets-0) (reason: Adding new member consumer-webstore-monitoring-service-1-86502ee0-eddf-445b-81c1-263b4d4e997a with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:10 [2023-09-04 10:20:10,158] INFO [GroupCoordinator 1]: Stabilized group webstore-monitoring-service generation 1 (__consumer_offsets-0) with 2 members (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:10 [2023-09-04 10:20:10,163] INFO [GroupCoordinator 1]: Assignment received from leader consumer-webstore-monitoring-service-1-86502ee0-eddf-445b-81c1-263b4d4e997a for group webstore-monitoring-service for generation 1. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:25 [2023-09-04 10:20:25,325] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group webstore-monitoring-service in Stable state. Created a new member id consumer-webstore-monitoring-service-2-8fcb73c0-e15a-49fa-a57c-68b7136dfe12 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:25 [2023-09-04 10:20:25,325] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group webstore-monitoring-service in Stable state. Created a new member id consumer-webstore-monitoring-service-1-26a5e384-5b9e-4b25-b565-b9dd45989045 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:25 [2023-09-04 10:20:25,326] INFO [GroupCoordinator 1]: Preparing to rebalance group webstore-monitoring-service in state PreparingRebalance with old generation 1 (__consumer_offsets-0) (reason: Adding new member consumer-webstore-monitoring-service-2-8fcb73c0-e15a-49fa-a57c-68b7136dfe12 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:21:07 [2023-09-04 10:21:07,177] INFO [GroupCoordinator 1]: Member consumer-webstore-monitoring-service-2-316b3fc5-30b1-49fa-ad80-c5d506ede709 in group webstore-monitoring-service has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:21:07 [2023-09-04 10:21:07,179] INFO [GroupCoordinator 1]: Member consumer-webstore-monitoring-service-1-86502ee0-eddf-445b-81c1-263b4d4e997a in group webstore-monitoring-service has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:21:07 [2023-09-04 10:21:07,181] INFO [GroupCoordinator 1]: Stabilized group webstore-monitoring-service generation 2 (__consumer_offsets-0) with 2 members (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:21:07 [2023-09-04 10:21:07,190] INFO [GroupCoordinator 1]: Assignment received from leader consumer-webstore-monitoring-service-2-8fcb73c0-e15a-49fa-a57c-68b7136dfe12 for group webstore-monitoring-service for generation 2. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) 
        try (var consumer = factory.createConsumer()) {
            consumer.subscribe(singletonList(topic));
            // join
            consumer.poll(Duration.ZERO);
            // reset offset to the begging
            consumer.seekToBeginning(emptyList());
            
            var endOffsets = consumer.endOffsets(consumer.assignment());

            do {
                // actually poll for messages
                var consumerRecords = consumer.poll(Duration.ofSeconds(2));
                // ... some code omitted ...
                consumer.commitSync();
            }
            while (notReachedEndOffsets(consumer, endOffsets));
        }

The consumer used to join the group in consumer.poll(Duration.ZERO) and then successfully read all the messages from the topic. What happens now is that the 0 duration isn't enough, and in that second poll for 2 seconds no records are read.

Ignore the quality/logiс of the actual code for a bit, yes, it needs a rework, the point is, it worked and is symptomatic of the same problem.

Now, I'm generally fine with polling for longer periods of time, but:

I could not find any relevant change in the changelogs or mention of this anywhere


Solution

  • So in the end these were two different issues. The test issue is explained as follows:

    1. My service's container restarted after every test class
    2. This means that a new consumer was connected to Kafka for every test class
    3. The container was shutdown using org.testcontainers.containers.GenericContainer#stop method, which actually just kills the container
    4. This means that the service didn't gracefully shutdown and never send a leave group request to Kafka.
    5. On the next test, Kafka was waiting for the killed container (from previous test) to send a heartbeat for a default of 45 seconds.
    6. After it received none it rebalanced and continued to work

    So this can be fixed in two ways:

    1. Shutdown the service gracefully between tests
    2. Set session.timeout.ms to a lower value

    In the end it has nothing to do with Spring, but rather with the fact that we didn't use integration tests pre 3.0

    The stage startup issue is not solved, but I'm going to brush this off, since it's a poor implementation anyway.