spring-integrationspring-integration-dslspring-integration-mqtt

Shared Subscription with Spring Integration Mqttv5 not Receiving Messages


I have a spring boot (v. 3.0.5) project using spring-integration-mqtt (v. 6.0.4) and also use paho mqttv5 client. I want to setup a shared subscription via the ClientManager and the Integration DSL. But I cannot get it to work.

@Bean
fun clientManager(): ClientManager<IMqttAsyncClient, MqttConnectionOptions> {
    val connectionOptions = MqttConnectionOptions()
    connectionOptions.serverURIs = arrayOf("tcp://example.org:1883")
    
    val clientManager = Mqttv5ClientManager(connectionOptions, "testClient")
    clientManager.setPersistence(MqttDefaultFilePersistence())
    return clientManager
}

@Bean
fun mqttTestInFlow(clientManager: ClientManager<IMqttAsyncClient, MqttConnectionOptions>): IntegrationFlow {
    val messageProducer = Mqttv5PahoMessageDrivenChannelAdapter(
        clientManager,
        "\$share/testGroup/foo/test",
    )

    return IntegrationFlow.from(messageProducer)
        .channel("mqttInputChannel")
        .get()
}

@ServiceActivator(inputChannel = "mqttInputChannel")
fun handler(message: Message<String>) {
    println("Received message: ${message.payload}")
}

I can see in the logs from my mosquitto broker that the subscription is created and also that messages published to foo/test are published to the testClient (the spring service). But my handler never receives these messages. When I remove the $share/testGroup from the topic string then everything works just fine.


Solution

  • This has been fixed in recent Spring Integration versions: https://github.com/spring-projects/spring-integration/issues/8879.

    Here is some demo to demonstrate shared subscription feature: https://github.com/artembilan/sandbox/tree/master/spring-integration-mqtt-share-demo