springspring-integrationmqtt

Spring Integrations. Sending messages to MQTT


As part of the learning curve, I am trying to publish a message to Mosquitto MQTT using Spring Integrations. I've set up a config (mostly from parts I found by googling, but the Listener part works well):

@Configuration
@RequiredArgsConstructor
@IntegrationComponentScan
public class MqttConfig {
private final MqttProperties properties;
    private final ObjectMapper objectMapper;

    private List<String> sensorTopics;

    @PostConstruct
    void init() {
        sensorTopics = properties.getSensors().stream()
                .map(MqttProperties.Sensor::getTopic)
                .toList();
    }

    @Bean
    public IntegrationFlow sensorDataFlow(HumilityAndTempSensorMessageHandler humilityAndTempSensorMessageHandler) {
        return IntegrationFlow.from(
                        new MqttPahoMessageDrivenChannelAdapter(
                                properties.getUrl(),
                                "iotBridge",
                                "#")
                )
                .filter(getSensorTopicFilter())
                .transform(getTempSensorTransformer())
                .handle(humilityAndTempSensorMessageHandler::handleMessage)
                .get();
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }


    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{properties.getUrl()});
        options.setUserName(properties.getUsername());
        options.setPassword(properties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("exoluse", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("topic");
        return messageHandler;
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface PublishGateway {
        void send(String message, @Header(MqttHeaders.TOPIC) String topic);
    }

    private GenericTransformer<String, TempSensorReading> getTempSensorTransformer() {
        return source -> {
            try {
                return objectMapper.readValue(source, TempSensorReading.class);
            } catch (Exception e) {
                System.out.println("failed to resolve input");
            }
            return null;
        };
    }

    private GenericHandler<String> getSensorTopicFilter() {
        return (payload, headers) -> {
            String topic = Optional.ofNullable(headers)
                    .map(item -> item.get("mqtt_receivedTopic"))
                    .map(String::valueOf)
                    .orElse(null);
            return sensorTopics.contains(topic);
        };
    }

And I'm trying to send a message:

@Service
//@RequiredArgsConstructor
@Slf4j
public class SocketGateway {

    @Autowired
    private MqttConfig.PublishGateway publishGateway;

    @PostConstruct
    void t() {
        // ${SHELLY_ID}/command/switch:0 -m toggle
        String topic = "xxx/command/switch:0";
        System.out.println(">>>>>> "+publishGateway);
        publishGateway.send("toggle", topic);
    }
}

And I'm getting the Dispatcher has no subscribers Exception.

Dependency version:

    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
      <version>6.4.1</version>
    </dependency>

What am I doing wrong?

UPD: full pom

 <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.2</version>
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>15</maven.compiler.source>
    <maven.compiler.target>15</maven.compiler.target>
    <javafx.version>17.0.1</javafx.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-mongodb</artifactId>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.28</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
      <version>6.4.1</version>
    </dependency>

  </dependencies>
</project>

Full stacktrace of the error:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'socketGateway': Invocation of init method failed
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:222) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1762) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:598) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:942) ~[spring-context-6.0.11.jar:6.0.11]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:608) ~[spring-context-6.0.11.jar:6.0.11]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.1.2.jar:3.1.2]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:734) ~[spring-boot-3.1.2.jar:3.1.2]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:436) ~[spring-boot-3.1.2.jar:3.1.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-3.1.2.jar:3.1.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1306) ~[spring-boot-3.1.2.jar:3.1.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1295) ~[spring-boot-3.1.2.jar:3.1.2]
    at org.draakz.iotbridge.Application.main(Application.java:9) ~[classes/:na]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.mqttOutboundChannel'.
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:327) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:466) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:669) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:584) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:550) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:540) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.11.jar:6.0.11]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:244) ~[spring-aop-6.0.11.jar:6.0.11]
    at jdk.proxy2/jdk.proxy2.$Proxy83.send(Unknown Source) ~[na:na]
    at org.draakz.iotbridge.mqtt.publisher.SocketGateway.t(SocketGateway.java:26) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMethod.invoke(InitDestroyAnnotationBeanPostProcessor.java:457) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:401) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:219) ~[spring-beans-6.0.11.jar:6.0.11]
    ... 18 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.1.2.jar:6.1.2]
    ... 42 common frames omitted

Solution

  • You do this:

    @PostConstruct
    void t() {
        // ${SHELLY_ID}/command/switch:0 -m toggle
        String topic = "xxx/command/switch:0";
        System.out.println(">>>>>> "+publishGateway);
        publishGateway.send("toggle", topic);
    }
    

    Basically application context initialization phase. This is too early to interact with external system, which you are trying to do via that message sending.

    I see that you use Spring Boot, so indeed @EnableIntegation is there and that @ServiceActivator is visible.

    Only the problem that you emit messages into channel which does not have subscribers yet, because that happens in the application start phase.

    Consider to use an @EventListener(ApplicationReadyEvent.class) method to emit messages instead doing that from the @PostConstruct.

    I think we can try to improve the error report. Instead of Dispatcher has no subscribers we may emit something like No message producing are allowed during initialization phase. Not sure if we can catch such a state, but that's already technical details.

    Please, raise respective GH issue and we will think about it for the next 6.5 version.