spring-integrationmqttpaho

Spring MQTT Integration shared subscritopn is not working


@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
    MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
    connectionOptions.setServerURIs(new String[]{mqttProperties.getUrl()});
    connectionOptions.
            setConnectionTimeout(30);
    connectionOptions.
            setMaxReconnectDelay(30);
    connectionOptions.setUserName(mqttProperties.getUsername());
    connectionOptions.setPassword(mqttProperties
            .getPassword()
            .getBytes(StandardCharsets.UTF_8)
    );
    connectionOptions.setAutomaticReconnect(true);
    Mqttv5ClientManager clientManager =
            new Mqttv5ClientManager(
                    connectionOptions,
                    Objects.requireNonNull(MyUtils.getClientId()));
    clientManager.setPersistence(new MemoryPersistence());
    return clientManager;
}


@Bean
public IntegrationFlow mqttInFlow(
        final ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
            new Mqttv5PahoMessageDrivenChannelAdapter(
                    clientManager, "$share/group1/test");

    messageProducer.setPayloadType(byte[].class);
    messageProducer.setManualAcks(true);
    messageProducer.setMessageConverter(new ByteArrayMessageConverter());

    return IntegrationFlow.from(messageProducer)
            .handle(mqttMessageHandler)
            .get();


}


public class MqttMessageHandler implements GenericHandler<byte[]> {

    @Override
    public Object handle(final byte[] payload, final MessageHeaders headers) {
        log.info("Received message: {}", payload);
        
    }
}

Given: Above will subscribe to shared topic "$share/group1/test"

Expectation: message(s) published to "test" should be available in MqttMessageHandler handle method to use.

Note: It's working for non-shared topic subscription.

Similar Question: Shared Subscription with Spring Integration Mqttv5 not Receiving Messages


Solution

  • Apparently it does not work with shared client. Or Paho library has some restrictions when same clientId is used from publisher and subscriber to the shared subscription. Or something else.

    When I use different clientId for publisher and subscriber, it works as expected: https://github.com/artembilan/sandbox/tree/master/spring-integration-mqtt-share-demo

    UPDATE

    OK. I see what is going on. We use this API:

    this.mqttClient.subscribe(new MqttSubscription[] {subscription},
                                null, null, this::messageArrived, subscriptionProperties)
    

    And that one is used like:

    this.comms.setMessageListener(subId, subscription.getTopic(), messageListener);
    

    Where in case of shared we do have that $share/group1/test.

    I logs I see this:

    subscriber-for-shared-subscription: received key=0 message=MqttPublish [, qos=0, retained=false, duplicate=false, topic=test, payload=[hex=746573742064617461, utf8=test data, length=9], properties=MqttProperties [validProperties=[1, 2, 35, 8, 9, 38, 3, 126, 11]]
    

    Pay attention to that topic=test. So, apparently our listener for the $share/group1/test is not selected because received package has just topic.

    The messageArrived() callback is called in case of non-manager via regular this.mqttClient.setCallback(this);.

    Sounds like a bug on Eclipse Paho client library: https://github.com/eclipse/paho.mqtt.java/issues/965