javaspringspring-bootspring-integration

@IntegrationComponentScan annotation cannot scan @MessageGateway component


I'm using SpringBoot3.2 and SpringIntegration6 for supporting MQTT messages, I have defined a public Maven module named sz-common-mqtt, and the effect I want to achieve is that any other module that depends on this module can directly use the MQTT-related message components. The maven dependency of this module is as follows

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.sz</groupId>
        <artifactId>sz-common</artifactId>
        <version>${revision}</version>
    </parent>
    <artifactId>sz-common-mqtt</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
        </dependency>
    </dependencies>
</project>

Then I defined 2 classes under the com.sz.mqtt.config package, namely MqttGateway and MqttConfig

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    void sendToMqtt(String data);
}
package com.sz.mqtt.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.converter.*;
import java.util.concurrent.Executors;


@Configuration
@IntegrationComponentScan("com.sz.mqtt.config")
@Slf4j
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttConfig {
    private String clientIdInbound;
    private String clientIdOutbound;
    private String url;
    private String password;
    private String username;

    @Bean
    public MqttConnectionOptions mqttConnectOptions(){
        MqttConnectionOptions options = new MqttConnectionOptions();
        options.setServerURIs(new String[] { url});
        options.setUserName(username);
        options.setPassword(password.getBytes());
        options.setAutomaticReconnect(true);
        return options;
    }

    @Bean
    public SimpleMessageConverter simpleMessageConverter(){
        return new SimpleMessageConverter();
    }

    @Bean
    public MessageHandler mqttOutboundHandler(MqttConnectionOptions connectionOptions) {
        Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(connectionOptions,clientIdOutbound);
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("defaultTopic");
        messageHandler.setDefaultQos(0);
        messageHandler.setConverter(simpleMessageConverter());
        return messageHandler;
    }


    @Bean
    public IntegrationFlow mqttOutboundFlow(MessageHandler mqttOutboundHandler){
        return IntegrationFlow.from("mqttOutboundChannel")
                .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
                .handle(mqttOutboundHandler)
                .get();
    }

}

I'm relying on the sz-common-mqtt above in my springboot module (another maven module)

        <dependency>
            <groupId>com.sz</groupId>
            <artifactId>sz-common-mqtt</artifactId>
            <version>${revision}</version>
        </dependency>

Then I directly inject the MqttGateway component using Spring@Component+Lombok

@Component
@Slf4j
@RequiredArgsConstructor
public class UnitClientManager {
    private final Map<Long, UnitSession> SESSION_MAP = new ConcurrentHashMap<>();
    private final MqttGateway mqttGateway;
    private final IntegrationFlowContext integrationFlowContext;
    ..........  other info
}

When I start SpringBoot I get the following error


***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 0 of constructor in sz.device.session.UnitClientManager required a bean of type 'com.sz.mqtt.config.MqttGateway' that could not be found.


Action:

Consider defining a bean of type 'com.sz.mqtt.config.MqttGateway' in your configuration.

How to solve the above problem to achieve the purpose of automatically assembling MqttGateway components?


Solution

  • Just ran your application. The @IntegrationComponentScan works as expected, however it still fails for me with different error:

    A component required a bean named 'mqttOutboundChannel' that could not be found.
    
    
    Action:
    
    Consider defining a bean named 'mqttOutboundChannel' in your configuration.
    

    That one indeed comes from the @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel").

    Looks like you declare one on-demand in the flow:

    @Bean
    public IntegrationFlow mqttOutboundFlow(MessageHandler mqttOutboundHandler){
        return IntegrationFlow.from("mqttOutboundChannel")
    

    This one indeed is not visible for the scanning and injection phase.

    So, we have to be explicit with regards of the required channel object:

    @Bean 
    DirectChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
    

    Makes it working well.

    Feels like your real application deals with different packages. In this sample you have:

    package com.sz;
    
    @SpringBootApplication
    public class MainApplication {
    

    Which does the trick for the @ComponentScan used in Spring Boot. So, it goes from that com.sz and even able to scan down to the com.sz.mqtt.config from that dependency module. This way it is able to trigger @IntegrationComponentScan which can see that MqttGateway interface to be registered as a proxy bean.

    After adding the channel bean, I moved on to the next error still coming from your config:

    Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.mqttOutboundChannel'.
    

    This is because you do:

    @PostConstruct
    public void init(){
        sendToMqtt("test","hello");
    }
    

    Which is not correct. We just don't interact with external systems from the application initialization phase.

    When I changed it to this:

    @EventListener(ApplicationReadyEvent.class)
    public void readyToSend() {
        sendToMqtt("test","hello");
    }
    

    I was able to pass initialization phase. Yes, it still fails for me, but that is expected because I don't have MQTT broker on the tcp://127.0.0.1:1883:

    Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused: getsockopt
        at org.eclipse.paho.mqttv5.client.internal.TCPNetworkModule.start(TCPNetworkModule.java:81)
        at org.eclipse.paho.mqttv5.client.internal.ClientComms$ConnectBG.run(ClientComms.java:783)
        ... 1 more
    Caused by: java.net.ConnectException: Connection refused: getsockopt
    

    For me all good so far. Please, update the sample to see that Consider defining a bean of type 'com.sz.mqtt.config.MqttGateway' error on our side.