I am trying to use the tcpSupplier provided as part of spring cloud stream applications. Pretty simple consumer registered to the tcpSupplier listening on default 1234 port.
@Bean
public Consumer<Flux<Message<Object>>> testFunction(){
//TcpSupplierConfiguration
return msgFlux -> {
List<Message<Object>> list =msgFlux.collectList().block();
System.out.println(list);
};
}
The app starts fine and its listening on 1234 port. But I am gettting the below errors when I send a kafka message over tcp
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'tcpSupplierFlow.channel#1' at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166)
............................
Caused by: java.lang.IllegalStateException: The [bean 'tcpSupplierFlow.channel#1'; defined in: 'class path resource [org/springframework/cloud/fn/supplier/tcp/TcpSupplierConfiguration.class]'; from source: 'bean method tcpSupplierFlow'] doesn't have subscribers to accept messages
What is missing in the above supplier-consumer model ? My main requirement is to listen to kafka messages on that port, introspect and if everything looks ok, send it to kafka broker (stream bridge or through use of default binding).
application yaml :
spring.application.name=my-application
spring.cloud.function.definition=consumer;producer
spring.cloud.stream.kafka.binder.brokers=<<kafka-host>>:9092
spring.cloud.stream.bindings.producer-out-0.destination=first_topic
spring.consumer-in-0.destination=first_topic
spring.producer-out-0.destination=first_topic
tcp.server.port=10001
tcp.port=1234
tcp.supplier.decoder=raw
tcp.supplier.buffer-size=2048
spring.docker.compose.enabled=false
You also have to add a tcpSupplier
into your spring.cloud.function.definition
. Otherwise it is not going to be bound and, therefore, have subscriber to that channel you are failing now. Plus you might need to have some specific destination for that binding otherwise it is a default one - tcpSupplier-out-0
.