javaspring-bootapache-kafkaspring-kafkaembedded-kafka

NPE Testing Kafka Producer Using Embedded Kafka


I've written a basic spring boot service that consumes some data via rest API and publishes it to rabbitmq and kafka.

To test the service class handling kafka producing, I followed this guide: https://www.baeldung.com/spring-boot-kafka-testing

In isolation, the test (KafkaMessagingServiceImplTest) works perfectly both in intellij idea and via mvn on the command line. Running all project tests in idea works fine. However, when I run all project tests via maven on the command line, this test fails with an NPE when trying to make the assertion on the payload String.

I've narrowed down the location of the root problem to another test class (AppPropertiesTest) which is solely testing my AppProperties component (which is a component I use to pull config from application.properties in a tidy way). When, and only when, the tests within that test class are run alongside the failing test using 'mvn clean install' in project root, does the NPE show up. Commenting out the tests in this class or annotating it with @DirtiesContext fixes the problem. Apparently something loaded into the spring context by this test class causes an issue with the timing/order of events/countdownlatch in the other test. Of course, I don't want to use @DirtiesContext as it can lead to a much slower build as the project increases in complexity. It also does not explain the problem.. and I can't handle that :)

AppPropertiesTest uses constructor injection to inject the AppProperties component. It also extends a abstract class 'GenericServiceTest' which is annotated by:

@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL) 

and contains nothing else. As you probably know, the SpringBootTest annotation builds a test spring context and wires in boilerplate to allow effective testing of a spring app's dependency injection etc. and the TestConstructor annotation allows constructor injection in some of my tests. FWIW, I have tried removing the TestConstructor annotation and using plain old Autowiring in the AppProperties class to see if it makes a difference but it does not.

The failing test class also extends GenericServiceTest, as it requires the spring context to inject some of the dependencies such as the consumer and the messaging service being tested and AppProperties instance within etc.

So I know where the problem lies but I don't know what the problem is. Even when the test fails with the NPE, I can see in the logs that the consumer has successfully consumed the message before the failure, as per the Baeldung guide :

TestKafkaConsumer  : received payload='ConsumerRecord(topic = test-kafka-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1618997289238, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = This is a test message to be sent to Kafka.)'

However, the payLoad is null when we get back to the assertion. I've tried all kinds of things like Thread.sleep() in the failing test to give it more time and I've increased the await() timeout but no joy.

I find it bizarre that the tests are fine in IDEA and in isolation. Now it's starting to drive me a little crazy and I can't debug it because the problem doesn't occur in my IDE.

If anyone has any ideas, it would be greatly appreciated!

Thanks.

EDIT: Someone very reasonably suggested that I add some code so here goes :)

The Failing Test (fails at assertTrue(payload.contains(testMessage)) because payLoad is null). The autowired kafkaMessagingService simply has the dependencies of AppProperties and KakfaTemplate injected and calls kafkaTemplate.send():


@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class KafkaMessagingServiceImplTest extends GenericServiceTest {

    @Autowired
    @Qualifier("kafkaMessagingServiceImpl")
    private IMessagingService messagingService;
    @Autowired
    private TestKafkaConsumer kafkaConsumer;
    @Value("${app.topicName}")
    private String testTopic;

    @Test
    public void testSendAndConsumeKafkaMessage() throws InterruptedException {
        String testMessage = "This is a test message to be sent to Kafka.";
        messagingService.sendMessage(testMessage);
        kafkaConsumer.getLatch().await(2000, TimeUnit.MILLISECONDS);
        String payload = kafkaConsumer.getPayload();
        assertTrue(payload.contains(testMessage));
    }

The TestConsumer (used to consume in the test above)

@Component
public class TestKafkaConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;

    @KafkaListener(topics = "${app.topicName}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        setPayload(consumerRecord.toString());

        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }

    public void setPayload(String payload) {
        this.payload = payload;
    }

Project dependencies:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.2.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.mockito/mockito-all -->
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <version>1.10.19</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>2.5.6.RELEASE</version>
            <scope>test</scope>
        </dependency>

    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

AppPropertiesTest class (the context of which seems to cause the problem)

class AppPropertiesTest extends GenericServiceTest {

    private final AppProperties appProperties;

    public AppPropertiesTest(AppProperties appProperties) {
        this.appProperties = appProperties;
    }

    @Test
    public void testAppPropertiesGetQueueName() {
        String expected = "test-queue";
        String result = appProperties.getRabbitMQQueueName();
        assertEquals(expected, result);
    }

    @Test
    public void testAppPropertiesGetDurableQueue() {
        boolean isDurableQueue = appProperties.isDurableQueue();
        assertTrue(isDurableQueue);
    }
}

The AppProperties class that the AppPropertiesTest class is testing:

@Component
@ConfigurationProperties("app")
public class AppProperties {

    // a whole bunch of properties by name that are prefixed by app. in the application.properties file. Nothing else
}

The Generic service test class which both tests extend.

@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
public abstract class GenericServiceTest {

}

The failure (you can see on the line above the payload has been received and printed out).

2021-04-21 14:15:07.113  INFO 493384 --- [ntainer#0-0-C-1] service.TestKafkaConsumer  : received payload='ConsumerRecord(topic = test-kafka-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1619010907076, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = This is a test message to be sent to Kafka.)'
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.791 s <<< FAILURE! - in 
service.KafkaMessagingServiceImplTest
[ERROR] testSendAndConsumeKafkaMessage  Time elapsed: 2.044 s  <<< ERROR!
java.lang.NullPointerException
    at service.KafkaMessagingServiceImplTest.testSendAndConsumeKafkaMessage(KafkaMessagingServiceImplTest.java:42)


Solution

  • The problem is that TestListener is a @Component so it is being added twice - the record is going to the other instance.

    I added more debugging to verify the getter is called on a different instance.

    @Component
    public class TestKafkaConsumer {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumer.class);
    
        private final CountDownLatch latch = new CountDownLatch(1);
        private String payload = null;
    
    
        @KafkaListener(id = "myListener", topics = "${app.kafkaTopicName}")
        public void receive(ConsumerRecord<?, ?> consumerRecord) {
            LOGGER.info("received payload='{}'", consumerRecord.toString());
            setPayload(consumerRecord.toString());
    
            if (payload != null) {
                LOGGER.info(this + ": payload is not null still");
            }
    
            latch.countDown();
    
            if (payload != null) {
                LOGGER.info(this + ": payload is not null after latch countdown");
            }
        }
    
        public CountDownLatch getLatch() {
            return latch;
        }
    
        public String getPayload() {
            LOGGER.info(this + ": getting Payload");
            return payload;
        }
    
        public void setPayload(String payload) {
            this.payload = payload;
        }
    }
    

    If you don't want to use @DirtiesContext, you can at least stop the listener containers after the tests complete:

    @SpringBootTest
    @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
    public abstract class GenericDataServiceTest {
    
        @AfterAll
        static void stopContainers(@Autowired KafkaListenerEndpointRegistry registry) {
            registry.stop();
        }
    
    }
    
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------