I'm testing the spring cloud stream with the Kafka binder to create a consumer of events published in the Kafka topic.
When the app is running this works fine, but for my test class, this doesn't work.
I have the following configurations in my app:
The pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.1</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>myApp</artifactId>
<version>0.0.1</version>
<name>myApp</name>
<description>Spring cloud stream app example</description>
<properties>
<!-- Project -->
<java.version>17</java.version>
<!-- Dependencies -->
<springdoc-openapi-starter-webmvc-ui.version>2.1.0</springdoc-openapi-starter-webmvc-ui.version>
<snakeyaml.version>2.0</snakeyaml.version>
<mapstruct.version>1.5.5.Final</mapstruct.version>
<logstash-logback-encoder.version>7.3</logstash-logback-encoder.version>
<opentelemetry.version>1.28.0</opentelemetry.version>
<spring-cloud.version>2022.0.3</spring-cloud.version>
<okhttp.version>4.11.0</okhttp.version>
<mockwebserver.version>4.11.0</mockwebserver.version>
<h2.version>2.2.220</h2.version>
<!-- Plugins -->
<jacoco-maven-plugin.version>0.8.10</jacoco-maven-plugin.version>
<properties-maven-plugin.version>1.1.0</properties-maven-plugin.version>
<sonar-maven-plugin.version>3.9.1.2184</sonar-maven-plugin.version>
<maven-resources-plugin.version>3.3.1</maven-resources-plugin.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>${opentelemetry.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
<exclusions>
<exclusion>
<artifactId>undertow-websockets-jsr</artifactId>
<groupId>io.undertow</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc-openapi-starter-webmvc-ui.version}</version>
</dependency>
<!-- Solves the https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-1471 in spring boot dependencies -->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>${snakeyaml.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>${hibernate-validator.version}</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>${mapstruct.version}</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>${logstash-logback-encoder.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>${mockwebserver.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>${maven-resources-plugin.version}</version>
<configuration>
<delimiters>
<delimiter>@</delimiter>
</delimiters>
<useDefaultDelimiters>false</useDefaultDelimiters>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
</execution>
</executions>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<type>jar</type>
<overWrite>false</overWrite>
</artifactItem>
</artifactItems>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</path>
<path>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${mapstruct.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco-maven-plugin.version}</version>
<executions>
<execution>
<id>prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>report</id>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
The application-integration-test.yaml:
spring:
cloud:
function:
definition: myConsumer
stream:
bindings:
myConsumer-in-0:
destination: a-kafka-topic
group: my-app-consumer-group
kafka:
binder:
brokers: localhost:9092
auto-create-topics: true
bindings:
myConsumer-in-0:
consumer:
enableDlq: true
dlqName: a-kafka-topic-dlt
autoCommitOnError: true
autoCommitOffset: true
The event class:
public record Event(
String message
) { }
The consumer class:
@Slf4j
@Component
public class MyConsumer implements Consumer<Message<Event>> {
@Override
public void accept(final Message<Event> message) {
log.info(message);
}
}
The custom annotation for config test class:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@EnableConfigurationProperties
@ContextConfiguration(initializers = ConfigDataApplicationContextInitializer.class)
@ActiveProfiles("integration-test")
@EnableKafka
@SpringBootTest
@EmbeddedKafka(
partitions = 1,
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092"
}
)
public @interface KafkaTest {
@AliasFor(annotation = SpringBootTest.class, attribute = "classes")
Class<?>[] classes() default {};
}
The utility class:
public class KafkaTestSupport {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private KafkaListenerEndpointRegistry endpointRegistry;
@BeforeEach
void setup() {
this.endpointRegistry.getListenerContainers().forEach(
messageListenerContainer -> ContainerTestUtils.waitForAssignment(
messageListenerContainer,
this.embeddedKafkaBroker.getPartitionsPerTopic()
)
);
}
protected Producer<String, String> createProducer() {
final var producerProperties = KafkaTestUtils.producerProps(embeddedKafkaBroker);
return new DefaultKafkaProducerFactory<>(
producerProperties,
new StringSerializer(),
new StringSerializer()
).createProducer();
}
}
The test class:
@KafkaTest
public class MyConsumerTest extends KafkaTestSupport {
@Test
public void shouldBeConsumeAnEvent() {
final var event = new Event("Hello kafka");
try (final var producer = createProducer()) {
final var producerRecord = new ProducerRecord<>(
"a-kafka-topic",
"",
JsonUtils.writeValueAsString(event) // ObjecMapper write JSON string
);
producer.send(producerRecord);
Thread.sleep(1000 * 30); // Forcing await for see the the log info configured on consumer class but the consumer class doesn't consume the published event
}
}
}
Why this doesn't work?
How to adjust my test for the consumer class consumes the event?
I solved this issue using the StreamBidge
.
@KafkaTest
public class MyConsumerTest {
@Autowired
private StreamBridge streamBridge;
@Test
public void shouldBeConsumeAnEvent() {
final var event = new Event("Hello kafka");
this.streamBridge.send("myConsumer-in-0", event);
Thread.sleep(1000 * 30); // Force await time for see the log event
}
}