spring-integrationmqttspring-integration-mqtt

SI subscription to multiple mqtt topics


I'm trying to learn how to handle MQTT Messages in Spring-Integration. Have created a converter, that subscribes with a single MqttPahoMessageDrivenChannelAdapter per MQTT Topic for consuming and converting the messages.

The problem is our data provider is planning to "speed-up" publishing messages on his side. So instead of having a few(<=10) topics each of which has messages with about 150 fields it is planned to publish each of those fields to the separate MQTT topic.

This means my converter would have to consume ca. 1000 mqtt topics, but I do not know whether:

  1. Is spring-integration still a good choice for it. Cause afaik. the mentioned adapter uses the PAHO MqttClient that will consume the messages from all of the topics it is subscribed to in one single thread and creating 1000 instances of those adapters is an overkill.
  2. If we stick further to spring-integration and use the provided components, would it be a good idea to create a single inbound adapter for all of the fields, that previously were in messages of one topic but moving the conversion away from the adapter bean to a separate bean ( that does the conversion) connected with an executer-channel to the adapter and thus executing the conversion of those fields on some threadpool in parallel.

Thanks in advance for your answers!


Solution

  • I think your idea makes sense.

    For that purpose you need to implement a passthrough MqttMessageConverter and provide an MqttMessage as a payload and topic as a header:

    public class PassThroughMqttMessageConverter implements MqttMessageConverter {
    
        @Override
        public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
            return MessageBuilder.withPayload(mqttMessage)
                    .setHeader(MqttHeaders.RECEIVED_TOPIC, topic)
                    .build();
        }
    
        @Override
        public Object fromMessage(Message<?> message, Class<?> targetClass) {
            return null;
        }
    
        @Override
        public Message<?> toMessage(Object payload, MessageHeaders headers) {
            return null;
        }
    
    }
    

    So, you really will be able to perform a target conversion downstream, after a mentioned ExecutorChannel in the custom transformer.

    You also may consider to implement a custom MqttPahoClientFactory (an extension of the DefaultMqttPahoClientFactory may work as well) and provide a custom ScheduledExecutorService to be injected into the MqttClient you are going create in the getClientInstance().