spring-bootspring-integrationspring-framework-beanshivemqspring-integration-mqtt

Spring Integration Mqtt : DestinationResolutionException: no output-channel or replyChannel header available


Please can someone help me to understand where is the probleme in this config: Versions :

@Configuration
public class MqttConfig {
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
       DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
       MqttConnectOptions options = new MqttConnectOptions();
       options.setServerURIs(new String[] { "tcp://localhost:1883" });
       return factory;
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory clientFactory) {
        return new MqttPahoMessageDrivenChannelAdapter("MyApp", clientFactory, "ReplyTopic");
    }

    @Bean
    IntegrationFlow inboundFlow(MqttPahoMessageDrivenChannelAdapter inboundAdapter) {
        return IntegrationFlows.from(inboundAdapter)
                               .bridge()
                               .channel("replyChannel")
                               .get();
    }

    @Bean
    public MessageChannel replyChannel() {       
        return MessageChannels.publishSubscribe().get();;
    }


   @Bean
   public MqttPahoMessageHandler outboundAdapter(MqttPahoClientFactory clientFactory) {
        return new MqttPahoMessageHandler("MyApp", clientFactory);
   }
   
   @Bean
   public IntegrationFlow outboundFlow(MqttPahoMessageHandler outboundAdapter) {
        return IntegrationFlows.from("requestChannel")
                               .handle(outboundAdapter).get()
   }


   @MessagingGateway
   public interface MyGateway {
        @Gateway(requestChannel = "requestChannel", replyChannel = "replyChannel")
        String send(String request, @Header(MqttHeaders.TOPIC) String requestTopic);
   }
}

Client code

@RestController
public class MyController {
    @Autowired
    private MyGateway myGateway;

    @GetMapping("/sendRequest")
    public String sendRequest() {
       var response = myGateway.send("Hello", "MyTopic");
       return response;
    }
}

Usage:

curl http://localhost:8080/sendRequest

manual response from the mqtt broker (HiveMQ)

docker exec -it hivemq mqtt pub -t ReplyTopic -m "World" --debug
CLIENT mqttClient-MQTT_5_0-9ecded84-8416-4baa-a8f3-d593c692bc65: acknowledged PUBLISH: 'World' for PUBLISH to Topic:  ReplyTopic

But I dont know why i have this message on the Spring application output

2022-10-25 18:04:33.171 ERROR 17069 --- [T Call: MyApp] .m.i.MqttPahoMessageDrivenChannelAdapter : Unhandled exception for GenericMessage [payload=World, headers={mqtt_receivedRetained=false, mqtt_id=0, mqtt_duplicate=false, id=9dbd5e14-66ed-5dc8-6cea-6d04ef19c6cc, mqtt_receivedTopic=ReplyTopic, mqtt_receivedQos=0, timestamp=1666713873170}]

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@6f63903c]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

Please can someone explain why i have this ?

no output-channel or replyChannel header available

Solution

  • I think the problem you are facing is not related to your bridge() configuration.

    This comes from the MessagingGatewaySupport and its replyMessageCorrelator feature which is activated by your replyChannel = "replyChannel".

    The real problem that you are trying to do what is not possible with MQTT v3. There is just no headers transferring over MQTT broker to carry on a required for gateway initiator a correlation key - the TemporaryReplyChannel. See more in docs about gateway: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway.

    In other words: independently of the replyChannel configuration on gateway, the replyChannel header must be present in the reply message. This is the way how gateway correlates requests with replies.

    You have to look into an aggregator to send the request message in parallel and to preserve the mentioned TemporaryReplyChannel header. Then when you receive a reply (inboundAdapter) you send it to this aggregator. You need to ensure some correlation key from a request and reply payload, so they can match and fulfill group for reply to be sent back to the gateway.

    See more info in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator