spring-integrationspring-kafkaspring-cloud-streamspring-cloud-functionspring-cloud-stream-binder

How to use Spring Stream Application supplier as a proxy?


I am trying to use the tcpSupplier provided as part of spring cloud stream applications. Pretty simple consumer registered to the tcpSupplier listening on default 1234 port.

@Bean
    public Consumer<Flux<Message<Object>>> testFunction(){
        //TcpSupplierConfiguration
        return msgFlux -> {
            List<Message<Object>> list =msgFlux.collectList().block();
            System.out.println(list);
        };
    }

The app starts fine and its listening on 1234 port. But I am gettting the below errors when I send a kafka message over tcp

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'tcpSupplierFlow.channel#1' at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166)

............................

Caused by: java.lang.IllegalStateException: The [bean 'tcpSupplierFlow.channel#1'; defined in: 'class path resource [org/springframework/cloud/fn/supplier/tcp/TcpSupplierConfiguration.class]'; from source: 'bean method tcpSupplierFlow'] doesn't have subscribers to accept messages

What is missing in the above supplier-consumer model ? My main requirement is to listen to kafka messages on that port, introspect and if everything looks ok, send it to kafka broker (stream bridge or through use of default binding).

application yaml :

spring.application.name=my-application
spring.cloud.function.definition=consumer;producer
spring.cloud.stream.kafka.binder.brokers=<<kafka-host>>:9092
spring.cloud.stream.bindings.producer-out-0.destination=first_topic
spring.consumer-in-0.destination=first_topic
spring.producer-out-0.destination=first_topic
tcp.server.port=10001
tcp.port=1234
tcp.supplier.decoder=raw
tcp.supplier.buffer-size=2048
spring.docker.compose.enabled=false

Solution

  • You also have to add a tcpSupplier into your spring.cloud.function.definition. Otherwise it is not going to be bound and, therefore, have subscriber to that channel you are failing now. Plus you might need to have some specific destination for that binding otherwise it is a default one - tcpSupplier-out-0.