javaspring-boottestingspring-kafkaspring-cloud-stream-binder-kafka

Spring cloud stream integration test with EmbeddedKafka doesn't work


I'm testing the spring cloud stream with the Kafka binder to create a consumer of events published in the Kafka topic.

When the app is running this works fine, but for my test class, this doesn't work.

I have the following configurations in my app:

The pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project
        xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.1</version>
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>myApp</artifactId>
    <version>0.0.1</version>
    <name>myApp</name>
    <description>Spring cloud stream app example</description>

    <properties>
        <!-- Project -->
        <java.version>17</java.version>

        <!-- Dependencies -->
        <springdoc-openapi-starter-webmvc-ui.version>2.1.0</springdoc-openapi-starter-webmvc-ui.version>
        <snakeyaml.version>2.0</snakeyaml.version>
        <mapstruct.version>1.5.5.Final</mapstruct.version>
        <logstash-logback-encoder.version>7.3</logstash-logback-encoder.version>
        <opentelemetry.version>1.28.0</opentelemetry.version>
        <spring-cloud.version>2022.0.3</spring-cloud.version>
        <okhttp.version>4.11.0</okhttp.version>
        <mockwebserver.version>4.11.0</mockwebserver.version>
        <h2.version>2.2.220</h2.version>

        <!-- Plugins -->
        <jacoco-maven-plugin.version>0.8.10</jacoco-maven-plugin.version>
        <properties-maven-plugin.version>1.1.0</properties-maven-plugin.version>
        <sonar-maven-plugin.version>3.9.1.2184</sonar-maven-plugin.version>
        <maven-resources-plugin.version>3.3.1</maven-resources-plugin.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.opentelemetry</groupId>
                <artifactId>opentelemetry-bom</artifactId>
                <version>${opentelemetry.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-undertow</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>undertow-websockets-jsr</artifactId>
                    <groupId>io.undertow</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
            <version>${springdoc-openapi-starter-webmvc-ui.version}</version>
        </dependency>
        <!-- Solves the https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-1471 in spring boot dependencies -->
        <dependency>
            <groupId>org.yaml</groupId>
            <artifactId>snakeyaml</artifactId>
            <version>${snakeyaml.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-validator</artifactId>
            <version>${hibernate-validator.version}</version>
        </dependency>
        <dependency>
            <groupId>org.mapstruct</groupId>
            <artifactId>mapstruct</artifactId>
            <version>${mapstruct.version}</version>
        </dependency>
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>${logstash-logback-encoder.version}</version>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
        </dependency>
        <dependency>
            <groupId>io.opentelemetry</groupId>
            <artifactId>opentelemetry-api</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-binder</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>${okhttp.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>mockwebserver</artifactId>
            <version>${mockwebserver.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <version>${h2.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <filtering>true</filtering>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>${maven-resources-plugin.version}</version>
                <configuration>
                    <delimiters>
                        <delimiter>@</delimiter>
                    </delimiters>
                    <useDefaultDelimiters>false</useDefaultDelimiters>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>copy</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <artifactItems>
                        <artifactItem>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                            <type>jar</type>
                            <overWrite>false</overWrite>
                        </artifactItem>
                    </artifactItems>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <annotationProcessorPaths>
                        <path>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                            <version>${lombok.version}</version>
                        </path>
                        <path>
                            <groupId>org.mapstruct</groupId>
                            <artifactId>mapstruct-processor</artifactId>
                            <version>${mapstruct.version}</version>
                        </path>
                    </annotationProcessorPaths>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.sonarsource.scanner.maven</groupId>
                <artifactId>sonar-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.jacoco</groupId>
                <artifactId>jacoco-maven-plugin</artifactId>
                <version>${jacoco-maven-plugin.version}</version>
                <executions>
                    <execution>
                        <id>prepare-agent</id>
                        <goals>
                            <goal>prepare-agent</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>report</id>
                        <goals>
                            <goal>report</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

The application-integration-test.yaml:

spring:
  cloud:
    function:
      definition: myConsumer

    stream:
      bindings:
        myConsumer-in-0:
          destination: a-kafka-topic
          group: my-app-consumer-group

      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: true
        bindings:
          myConsumer-in-0:
            consumer:
              enableDlq: true
              dlqName: a-kafka-topic-dlt
              autoCommitOnError: true
              autoCommitOffset: true

The event class:

public record Event(
    String message
) { }

The consumer class:

@Slf4j
@Component
public class MyConsumer implements Consumer<Message<Event>> {

    @Override
    public void accept(final Message<Event> message) {
        log.info(message);
    }

}

The custom annotation for config test class:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@EnableConfigurationProperties
@ContextConfiguration(initializers = ConfigDataApplicationContextInitializer.class)
@ActiveProfiles("integration-test")
@EnableKafka
@SpringBootTest
@EmbeddedKafka(
    partitions = 1,
    brokerProperties = {
        "listeners=PLAINTEXT://localhost:9092",
        "port=9092"
    }
)
public @interface KafkaTest {

    @AliasFor(annotation = SpringBootTest.class, attribute = "classes")
    Class<?>[] classes() default {};

}

The utility class:

public class KafkaTestSupport {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private KafkaListenerEndpointRegistry endpointRegistry;

    @BeforeEach
    void setup() {
        this.endpointRegistry.getListenerContainers().forEach(
            messageListenerContainer -> ContainerTestUtils.waitForAssignment(
                messageListenerContainer,
                this.embeddedKafkaBroker.getPartitionsPerTopic()
            )
        );
    }

    protected Producer<String, String> createProducer() {
        final var producerProperties = KafkaTestUtils.producerProps(embeddedKafkaBroker);
        return new DefaultKafkaProducerFactory<>(
            producerProperties,
            new StringSerializer(),
            new StringSerializer()
        ).createProducer();
    }

}

The test class:

@KafkaTest
public class MyConsumerTest extends KafkaTestSupport {

    @Test
    public void shouldBeConsumeAnEvent() {
        final var event = new Event("Hello kafka");

        try (final var producer = createProducer()) {
            final var producerRecord = new ProducerRecord<>(
                "a-kafka-topic",
                "",
                JsonUtils.writeValueAsString(event) // ObjecMapper write JSON string
            );
            producer.send(producerRecord);

            Thread.sleep(1000 * 30); // Forcing await for see the the log info configured on consumer class but the consumer class doesn't consume the published event
        }


    }

}

Why this doesn't work?

How to adjust my test for the consumer class consumes the event?


Solution

  • I solved this issue using the StreamBidge.

    @KafkaTest
    public class MyConsumerTest {
    
        @Autowired
        private StreamBridge streamBridge;
    
        @Test
        public void shouldBeConsumeAnEvent() {
            final var event = new Event("Hello kafka");
           
            this.streamBridge.send("myConsumer-in-0", event);
    
            Thread.sleep(1000 * 30); // Force await time for see the log event
       }
    
    }