spring-bootapache-kafkaspring-kafkaspring-kafka-testembedded-kafka

Test Kafka consumer using EmbddedKafka fail when launching a batch of tests


I'm testing my Kafka Consumer in Spring Boot. My consumer are similar to the following

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaPaymentConsumer {

    private final PaymentInterface paymentInterface;

    @KafkaListener(topics = "#{'${kafka.topic.payment}'}",
                   groupId = "#{'${kafka.group-id}'}")
    public void consumePaymentEvents(PaymentEvent paymentEvent) {
            paymentInterface.handleReceiptPaymentReceivedEvent(paymentEvent);        
    }
}

And My test cases are similar to the following

@SpringBootTest
@EmbeddedKafka(brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
               partitions = 1,
               controlledShutdown = true)
class KafkaPaymentConsumerTest {

    @Autowired
    KafkaTemplate<String, PaymentEvent> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    @Value("${kafka.topic.payment}")
    private String paymentTopic;


    @SpyBean
    private KafkaPaymentConsumer kafkaPaymentConsumer;

    @SpyBean
    private PaymentInterface paymentInterface;

    @Captor
    ArgumentCaptor<PaymentEvent> paymentEventCaptor;

    private static File PAYMENT_EVENT_JSON = Paths.get("src", "test", "resources", "files",
                                                       "Payment.json").toFile();
    @Test
    @SneakyThrows
    @DirtiesContext
    void consumePaymentEvents() {
        PaymentEvent event = objectMapper.readValue(PAYMENT_EVENT_JSON,
                                                    PaymentEvent.class);
        kafkaTemplate.send(paymentTopic, "1", event);

        verify(kafkaPaymentConsumer, timeout(10000).times(1)).consumePaymentEvents(
                paymentEventCaptor.capture());
        PaymentEvent argument = paymentEventCaptor.getValue();

        verify(paymentInterface, timeout(10000).times(1)).handleReceiptPaymentReceivedEvent(any());
    }
}

the test works well, BUT when running a batch of tests at once, some tests fail ! ( only when I run many tests at the same time !! ) it seems that there is an issue in the context with @EmbeddedKafka

I got like theses log errors

Actually, there were zero interactions with this mock.

or a Timeout when trying to poll records from the broker

Any explanation or suggestion please


Solution

  • Since you don’t use a @DirtiesContext on your test class to close an application context in the end, it is not a surprise that other tests for the same topic can steal data from you. See if you can clean up contexts as I explained, or consider to use different topics in different tests. I’d prefer the dirties context since it guarantees that no any extra resources in the memory to cause race conditions and surprises .