I want the producer in my test to wait until the consumer in the class-under-test has acknowledged by calling Acknowledgement.acknowledge()
. My consumer is initialized with the following properties:
spring:
kafka:
consumer:
group-id: test-group
bootstrap-servers: ${spring.embedded.kafka.brokers}
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 1
My kafkaListenerContainerFactory
is initialized as follows:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageType> kafkaListenerContainerFactory(
ConsumerFactory<String, MessageType> consumerFactory)
{
ConcurrentKafkaListenerContainerFactory<String, MessageType> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new LoggingErrorHandler());
factory.setConcurrency(numListeners);
return factory;
}
I have also set acks
in my producer config to all
. The producer doesn't seem to block though even though I'm waiting on the future returned.
producer.send(new ProducerRecord<Integer, String>(TOPIC, key, value)).get();
How can I get the producer in my test to block until the consumer acknowledges without adding some sort of sleep
?
You cannot; producers and consumers are independent; acks
are simply to acknowledge that the broker has received and secured the record; it has nothing to do with the consumer side.
You need your own logic for the consumer to tell the producer it has received the record.