I am unable to get the test binder working for a kstream based function(doNothing
) as shown below.
The same works for a non-kstream based function(uppercase
). This uses a kafka binder.
@Configuration
public class CPPNotificationConfiguration {
@Bean
public Function<KStream<byte[], byte[]>, KStream<byte[], byte[]>> doNothing() {
return a -> a.flatMap((key, value) -> Collections.singleton(KeyValue.pair(key,value)));
}
@Bean
public Function<String, String> uppercase() {
return String::toUpperCase;
}
}
The bindings are configured as follows for the above lambdas
spring:
cloud.stream:
function:
definition: doNothing;uppercase
bindings:
doNothing-in-0:
destination: doNothing-in-topic
doNothing-out-0:
destination: doNothing-out-topic
uppercase-in-0:
destination: uppercase-in-topic
uppercase-out-0:
destination: uppercase-out-topic
The test cases are as follows
@SpringBootTest
@Import(TestChannelBinderConfiguration.class)
class CPPNotificationtest {
@Autowired private InputDestination input;
@Autowired private OutputDestination output;
@Test
void upperCaseTest() {
input.send(new GenericMessage<>("asdfa"), "uppercase-in-topic");
Message<byte[]> message = output.receive(100, "uppercase-out-topic");
Assertions.assertArrayEquals("asdfa".toUpperCase().getBytes(), message.getPayload());
}
@Test
void doNothingTest() {
input.send(new GenericMessage<>("asdfa".getBytes()), "doNothing-in-topic");
Message<byte[]> message = output.receive(100, "doNothing-out-topic");
Assertions.assertArrayEquals("asdfa".getBytes(), message.getPayload());
}
}
upperCaseTest
runs fine. doNothingTest
fails with a
java.lang.NullPointerException
at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:89)
at com.mokapos.cpp.notification.integration.UpperCaseTest.itemNotificationTest(UpperCaseTest.java:31)
On using a debugger I see that InputDestination
and OutputDestination
have channels related only to uppercase
lambda and not the doNothing
lambda/function. What am I missing here?
Relevant portion of my build.gradle
implementation "org.springframework.boot:spring-boot-starter-web"
implementation "org.springframework.boot:spring-boot-starter-logging"
implementation 'org.springframework.boot:spring-boot-starter-test'
implementation "org.springframework.boot:spring-boot-starter-actuator"
implementation "org.springframework.cloud:spring-cloud-stream"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka-streams"
testImplementation("org.springframework.cloud:spring-cloud-stream") {
artifact {
name = "spring-cloud-stream"
extension = "jar"
type ="test-jar"
classifier = "test-binder"
}
}
Test binder in Spring Cloud Stream is not intended for testing Kafka Streams based applications. The test binder is Spring Integration based and only works with message channel based binders. Your best option is to use an EmbeddedKafka
and do an integration test for these types of scenarios. We have several examples here.