I'm trying to test parallel consumption from 3-paritition topic in kafka. I assumed that all consumers should be able to subscribe and process messages, but I get the exception:
java.lang.IllegalStateException: Failed to be assigned partitions from the embedded topics
and before exception this is what I see (a lot lines with this):
[ | common-group] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-common-group-1, groupId=common-group] Request joining group due to: group is already rebalancing
[ | common-group] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-common-group-1, groupId=common-group] Request joining group due to: group is already rebalancing
Here is the code:
@SpringBootTest(classes = StreamApp.class)
@EmbeddedKafka(partitions = 3,
topics = {
"${kafka-demo.topics.input.name}",
"${kafka-demo.topics.output.name}"
},
brokerProperties = {
"transaction.state.log.replication.factor=1",
"offsets.topic.replication.factor=1",
"transaction.state.log.min.isr=1"
})
public class ScalingIT {
@Autowired
private KafkaTemplate<Object, Object> template;
@Autowired
private EmbeddedKafkaBroker broker;
@Autowired
private JsonSerde<Message> messageSerde;
private List<Consumer<String, Message>> messageConsumers = new ArrayList<>();
@Autowired
private KafkaDemoProps props;
@BeforeEach
void setup() {
this.messageConsumers.add(consumer(props.topics().output().name(), Serdes.String(), messageSerde));
this.messageConsumers.add(consumer(props.topics().output().name(), Serdes.String(), messageSerde));
this.messageConsumers.add(consumer(props.topics().output().name(), Serdes.String(), messageSerde));
}
@AfterEach
public void teardown() {
messageConsumers.forEach(c -> {
if (c != null) {
c.close();
}
});
}
private <K, V> Consumer<K, V> consumer(String topic, Serde<K> keySerde, Serde<V> valueSerde) {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps("common-group", "false", this.broker);
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
DefaultKafkaConsumerFactory<K, V> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps, keySerde.deserializer(), valueSerde.deserializer());
Consumer<K, V> consumer = kafkaConsumerFactory.createConsumer();
this.broker.consumeFromAnEmbeddedTopic(consumer, topic);
return consumer;
}
@Test
@DirtiesContext
void allSentMessagesGoVia3Consumers() {
int numRecords = 100;
IntStream.range(0,numRecords).forEach(i -> {
template.send(props.topics().input().name(), String.format("{\"SomeProp\":\"%s\"}", i).getBytes());
});
ConsumerRecords<String, Message> records1 = KafkaTestUtils.getRecords(this.messageConsumers.get(0));
ConsumerRecords<String, Message> records2 = KafkaTestUtils.getRecords(this.messageConsumers.get(1));
ConsumerRecords<String, Message> records3 = KafkaTestUtils.getRecords(this.messageConsumers.get(2));
List<ConsumerRecord<String, Message>> allRecords = new ArrayList<>();
records1.forEach(allRecords::add);
records2.forEach(allRecords::add);
records3.forEach(allRecords::add);
assertThat(allRecords.size()).isEqualTo(numRecords);
}
}
Anyone has any idea how to test multiple consumers in the same group?
It wasn't designed to be used that way; the problem is, when the second consumer polls, the first consumer needs to call poll()
for the rebalance to complete.
It was only intended to support a single consumer.
You should be able to use manual assignment instead of group management.
Replace
this.broker.consumeFromAnEmbeddedTopic(consumer, topic);
with
consumer.assign(List.of(new TopicPartition(topic, this.partition++)))
You will also need
bootstrapServersProperty = "spring.kafka.bootstrap-servers"
on the embedded broker so that the auto configured template will send the records there.
EDIT
I also changed your test to send a deterministic number of records to each partition and specifically to wait for that number to arrive...
@Test
@DirtiesContext
void allSentMessagesGoVia3Consumers() {
int numRecords = 100;
IntStream.range(0, numRecords).forEach(i -> {
template.send("output", i % 3, null, String.format("{\"SomeProp\":\"%s\"}", i).getBytes());
});
ConsumerRecords<String, String> records1 = KafkaTestUtils.getRecords(this.messageConsumers.get(0), 60_000, 34);
ConsumerRecords<String, String> records2 = KafkaTestUtils.getRecords(this.messageConsumers.get(1), 60_000, 33);
ConsumerRecords<String, String> records3 = KafkaTestUtils.getRecords(this.messageConsumers.get(2), 60_000, 33);
List<ConsumerRecord<String, String>> allRecords = new ArrayList<>();
records1.forEach(allRecords::add);
records2.forEach(allRecords::add);
records3.forEach(allRecords::add);
assertThat(allRecords.size()).isEqualTo(numRecords);
}