I'm using org.eclipse.paho.mqttv5.client
in Spring Integration and trying to set no local
option in mqtt, like this:
@Bean
public MessageProducer inbound(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(
clientManager,
"test"
);
adapter.setCompletionTimeout(5000);
adapter.setQos(2);
adapter.connectComplete(true);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
but Mqttv5PahoMessageDrivenChannelAdapter
has no method to set MqttSubscription
(which has configuration of mqtt's no-local)
In Mqttv5PahoMessageDrivenChannelAdapter
class, it has a method subscribe
:
private void subscribe() {
var clientManager = getClientManager();
if (clientManager != null && this.mqttClient == null) {
this.mqttClient = clientManager.getClient();
}
String[] topics = getTopic();
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
this.topicLock.lock();
try {
if (topics.length == 0) {
return;
}
int[] requestedQos = getQos();
MqttSubscription[] subscriptions = IntStream.range(0, topics.length)
.mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i]))
.toArray(MqttSubscription[]::new);
IMqttMessageListener listener = this::messageArrived;
IMqttMessageListener[] listeners = IntStream.range(0, topics.length)
.mapToObj(t -> listener)
.toArray(IMqttMessageListener[]::new);
this.mqttClient.subscribe(subscriptions, null, null, listeners, null)
.waitForCompletion(getCompletionTimeout());
String message = "Connected and subscribed to " + Arrays.toString(topics);
logger.debug(message);
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
}
}
catch (MqttException ex) {
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
}
logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics));
}
finally {
this.topicLock.unlock();
}
}
but it create MqttSubscription
only with params topic
and qos
:
MqttSubscription[] subscriptions = IntStream.range(0, topics.length).mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i])).toArray(MqttSubscription[]::new);
This is something we have missed when we introduced MQTT v5 support.
It looks like we have to introduce something like MqttSubscription
-based constructors as an alternative option to the plain topic
and its qos
. This way you will be able to fine-grain configuration for each subscription.
Please, raise a GH issue and we will address it for the next Spring Integration version.
As a workaround I only can suggestion to use Paho API directly. The custom MessageProducerSupport
impl can be used to wire it with the rest of integration flow in your project.