spring-cloud-streamspring-cloud-stream-binder-kafkaspring-cloud-stream-binder

spring-cloud-stream test-binder not working for a KStream based processor


I am unable to get the test binder working for a kstream based function(doNothing) as shown below.

The same works for a non-kstream based function(uppercase). This uses a kafka binder.

@Configuration
public class CPPNotificationConfiguration {
 
    @Bean
    public Function<KStream<byte[], byte[]>, KStream<byte[], byte[]>> doNothing() {
        return a -> a.flatMap((key, value) -> Collections.singleton(KeyValue.pair(key,value)));
    }

    @Bean
    public Function<String, String> uppercase() {
        return String::toUpperCase;
    }
}

The bindings are configured as follows for the above lambdas

spring:
  cloud.stream:
    function:
      definition: doNothing;uppercase
    bindings:
      doNothing-in-0:
        destination: doNothing-in-topic
      doNothing-out-0:
        destination: doNothing-out-topic
      uppercase-in-0:
        destination: uppercase-in-topic
      uppercase-out-0:
        destination: uppercase-out-topic

The test cases are as follows

@SpringBootTest
@Import(TestChannelBinderConfiguration.class)
class CPPNotificationtest {
    @Autowired private InputDestination input;
    @Autowired private OutputDestination output;

    @Test
    void upperCaseTest() {
        input.send(new GenericMessage<>("asdfa"), "uppercase-in-topic");
        Message<byte[]> message = output.receive(100, "uppercase-out-topic");
        Assertions.assertArrayEquals("asdfa".toUpperCase().getBytes(), message.getPayload());
    }

    @Test
    void doNothingTest() {
        input.send(new GenericMessage<>("asdfa".getBytes()), "doNothing-in-topic");
        Message<byte[]> message = output.receive(100, "doNothing-out-topic");
        Assertions.assertArrayEquals("asdfa".getBytes(), message.getPayload());
    }
}

upperCaseTest runs fine. doNothingTest fails with a

java.lang.NullPointerException
    at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:89)
    at com.mokapos.cpp.notification.integration.UpperCaseTest.itemNotificationTest(UpperCaseTest.java:31)

On using a debugger I see that InputDestination and OutputDestination have channels related only to uppercase lambda and not the doNothing lambda/function. What am I missing here?

enter image description here

Relevant portion of my build.gradle

implementation "org.springframework.boot:spring-boot-starter-web"
    implementation "org.springframework.boot:spring-boot-starter-logging"
    implementation 'org.springframework.boot:spring-boot-starter-test'
    implementation "org.springframework.boot:spring-boot-starter-actuator"

    implementation "org.springframework.cloud:spring-cloud-stream"
    implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka"
    implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka-streams"
testImplementation("org.springframework.cloud:spring-cloud-stream") {
        artifact {
            name = "spring-cloud-stream"
            extension = "jar"
            type ="test-jar"
            classifier = "test-binder"
        }
    }

Solution

  • Test binder in Spring Cloud Stream is not intended for testing Kafka Streams based applications. The test binder is Spring Integration based and only works with message channel based binders. Your best option is to use an EmbeddedKafka and do an integration test for these types of scenarios. We have several examples here.