I have a spring boot (v. 3.0.5) project using spring-integration-mqtt (v. 6.0.4) and also use paho mqttv5 client. I want to setup a shared subscription via the ClientManager
and the Integration DSL. But I cannot get it to work.
@Bean
fun clientManager(): ClientManager<IMqttAsyncClient, MqttConnectionOptions> {
val connectionOptions = MqttConnectionOptions()
connectionOptions.serverURIs = arrayOf("tcp://example.org:1883")
val clientManager = Mqttv5ClientManager(connectionOptions, "testClient")
clientManager.setPersistence(MqttDefaultFilePersistence())
return clientManager
}
@Bean
fun mqttTestInFlow(clientManager: ClientManager<IMqttAsyncClient, MqttConnectionOptions>): IntegrationFlow {
val messageProducer = Mqttv5PahoMessageDrivenChannelAdapter(
clientManager,
"\$share/testGroup/foo/test",
)
return IntegrationFlow.from(messageProducer)
.channel("mqttInputChannel")
.get()
}
@ServiceActivator(inputChannel = "mqttInputChannel")
fun handler(message: Message<String>) {
println("Received message: ${message.payload}")
}
I can see in the logs from my mosquitto broker that the subscription is created and also that messages published to foo/test
are published to the testClient
(the spring service). But my handler never receives these messages.
When I remove the $share/testGroup
from the topic string then everything works just fine.
This has been fixed in recent Spring Integration versions: https://github.com/spring-projects/spring-integration/issues/8879.
Here is some demo to demonstrate shared subscription feature: https://github.com/artembilan/sandbox/tree/master/spring-integration-mqtt-share-demo