spring-kafkaspring-kafka-test

Testing Spring Kafka Listener in Spring Boot Test with EmbeddedKafka


I'm trying to test a Spring Kafka listener in a Spring Boot test using @EmbeddedKafka. However, I keep encountering the following exception:

No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

My Setup:

Listener:

@Component
@Slf4j
public class CancelAuthorizationLinkageListener {
    private final CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;
    private final CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService;
    private final KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate;
    private final String retryTopic;

    public CancelAuthorizationLinkageListener(CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor,
                                              CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService,
                                              KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate,
                                              @Value("${spring.kafka.producer.retry-topic}") String retryTopic) {
        this.cancelAuthorizationLinkageProcessor = cancelAuthorizationLinkageProcessor;
        this.cancelAuthorizationLinkageService = cancelAuthorizationLinkageService;
        this.kafkaTemplate = kafkaTemplate;
        this.retryTopic = retryTopic;
    }

    @Bean
    public RecordMessageConverter converter() {
        return new JsonMessageConverter();
    }

    @Bean
    public BatchMessagingMessageConverter batchConverter() {
        return new BatchMessagingMessageConverter(converter());
    }

    @KafkaListener(id = "${spring.kafka.consumer.properties.cancel-authorization-linkage-listener-id}",
            topics = "${spring.kafka.consumer.linkage-topic}", autoStartup = "false",
            batch = "true",
            groupId = "group1", concurrency = "2")
    public void listen(List<CancelAuthorizationLinkageResource> cancelAuthorizationLinkageResources) {
        for (CancelAuthorizationLinkageResource cancelAuthorizationLinkageResource : cancelAuthorizationLinkageResources) {
            try {
                CancelAuthorizationLinkageWriterResource cancelAuthorizationLinkageWriterResource =
                        cancelAuthorizationLinkageProcessor.process(cancelAuthorizationLinkageResource);
                if (cancelAuthorizationLinkageWriterResource != null) {
                    cancelAuthorizationLinkageService.linkageAuthorization(
                            cancelAuthorizationLinkageWriterResource.getApiResource());
                }
            } catch (Exception e) {
                log.error("listener error: {}", e.getMessage());
                kafkaTemplate.send(retryTopic, cancelAuthorizationLinkageResource.getAuthorizationId(),
                        cancelAuthorizationLinkageResource);
            }
        }
    }

My listener accepts messages, and if any of these messages fail to process, I push them back to to the same topic for retry. Therefore, my listener both consumes and produces messages. I must ensure the transactional nature of this process to prevent cases where failed messages are not pushed back to the retry topic but the offset for the consumed message is still committed. This would result in the inability to retry the failed messages.

Test Code:

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
        properties = {"spring.batch.job.name=cancelAuthorizationLinkageJob",
                "bootstrap-servers: ${spring.embedded.kafka.brokers}"})
@DirtiesContext
@EmbeddedKafka(
        partitions = 5, topics = {"${spring.kafka.consumer.linkage-topic}", "ppcd.cushion.cancel.auth.retry"},
        count = 3)
class CancelAuthorizationLinkageListenerTest {

    @Autowired
    private CancelAuthorizationLinkageListener cancelAuthorizationLinkageListener;

    @Mock
    private CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Autowired
    private ConsumerFactory<String, CancelAuthorizationLinkageResource> consumerFactory;

    @Value("${spring.kafka.consumer.linkage-topic}")
    private String linkageTopic;

    @Value("${spring.kafka.producer.retry-topic}")
    private String retryTopic;

    private Consumer<String, CancelAuthorizationLinkageResource> consumer;

    @BeforeEach
    public void setUp() {
        consumer = consumerFactory.createConsumer();
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, retryTopic);
    }

    @Test
    @DisplayName("OK-取消オーソリ処理中エラーが起きた場合、retryトピックへ送信する")
    void of_ok_1() throws Exception {
        // init
        int ngNumber = 1;
        AtomicInteger atomicInteger = new AtomicInteger(0);

        // mock
        doThrow(new InvalidValueException("test")).when(cancelAuthorizationLinkageProcessor).process(any());

        // verify
        cancelAuthorizationLinkageListener.listen(List.of(createCancelAuthorizationLinkageResource(true)));

        await()
                .atMost(2, SECONDS)
                .pollInterval(1, SECONDS)
                .untilAsserted(() -> {
                    KafkaTestUtils.getRecords(consumer).records(retryTopic)
                            .forEach(x -> atomicInteger.incrementAndGet());
                    assertEquals(ngNumber, atomicInteger.get());
                });
    }

I'm trying to test a Spring Kafka listener in a Spring Boot test using @EmbeddedKafka. The listener looks like correctly configured with transactions in production code(I'm not pretty sure), but in the test environment context, although I use @Autowired, the listener does not operate within a transaction. Can someone explain why this happens and how to ensure the transactional behavior in the test context as well?

application.yml:

spring:
  profiles:
    active: "local"
  application:
    name:
  batch:
    initialize-schema: ALWAYS
    job:
      names:
      #enable: false
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      acks: -1
      transaction-id-prefix: cushion-kafka-tx-${random.uuid}
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retry-topic: ppcd.cushion.cancel.auth.retry

    #      retries: 5
    consumer:
      group-id: groupid-Dev
      auto-offset-reset: earliest
      max-poll-records: 20
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      properties:
        cancel-authorization-linkage-listener-id: cancel-authorization-linkage-listener
        test-cancel-authorization-linkage-listener-id: test-cancel-authorization-linkage-listener
        spring.json.trusted.packages: '*'
        isolation.level: read_committed
      linkage-topic: ppcd.matching.credit.auth.cancel.auto.matched.result.cushion

With Spring Boot, it is only necessary to set the spring.kafka.producer.transaction-id-prefix property - Spring Boot will automatically configure a KafkaTransactionManager bean and wire it into the listener container. According to the Spring documentation, I believe I have configured the transactions correctly.


Solution

  • So, or follow recommendations from that error, or just don't use transaction-id-prefix property for your producer config.