I'm trying to learn how to handle MQTT Messages in Spring-Integration. Have created a converter, that subscribes with a single MqttPahoMessageDrivenChannelAdapter per MQTT Topic for consuming and converting the messages.
The problem is our data provider is planning to "speed-up" publishing messages on his side. So instead of having a few(<=10) topics each of which has messages with about 150 fields it is planned to publish each of those fields to the separate MQTT topic.
This means my converter would have to consume ca. 1000 mqtt topics, but I do not know whether:
Thanks in advance for your answers!
I think your idea makes sense.
For that purpose you need to implement a passthrough MqttMessageConverter
and provide an MqttMessage
as a payload
and topic
as a header:
public class PassThroughMqttMessageConverter implements MqttMessageConverter {
@Override
public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
return MessageBuilder.withPayload(mqttMessage)
.setHeader(MqttHeaders.RECEIVED_TOPIC, topic)
.build();
}
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
return null;
}
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
return null;
}
}
So, you really will be able to perform a target conversion downstream, after a mentioned ExecutorChannel
in the custom transformer
.
You also may consider to implement a custom MqttPahoClientFactory
(an extension of the DefaultMqttPahoClientFactory
may work as well) and provide a custom ScheduledExecutorService
to be injected into the MqttClient
you are going create in the getClientInstance()
.