
How can @MessagingGateway be configured with Spring Cloud Stream MessageChannels?

I have developed asynchronous Spring Cloud Stream services, and I am trying to develop an edge service that uses @MessagingGateway to provide synchronous access to services that are async by nature.

I am currently getting the following stack trace:

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:355)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 47 common frames omitted

My @MessagingGateway:


public interface AccountService {
  @Gateway(requestChannel = AccountChannels.CREATE_ACCOUNT_REQUEST,replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);

If I consume the message on the reply channel via a @StreamListener, it works just fine:

  @HystrixCommand(commandKey = "acounts-edge:accountCreated", fallbackMethod = "accountCreatedFallback", commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE")}, ignoreExceptions = {ClientException.class})
  public void accountCreated(Account account, @Header(name = "spanTraceId", required = false) String traceId) {
    try {
      if (log.isInfoEnabled()) {
        log.info(new StringBuilder("Account created: ").append(objectMapper.writeValueAsString(account)).toString());
    } catch (JsonProcessingException e) {
      log.error(e.getMessage(), e);

On the producer side, I am configuring requiredGroups to ensure that multiple consumers can process the message, and correspondingly, the consumers have matching group configurations.


          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          requiredGroups: accounts-service-create-account-request
          binder: rabbit1
          contentType: application/json
          destination: account-created
          group: accounts-edge-account-created


          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          group: accounts-service-create-account-request
          binder: rabbit1
          contentType: application/json
          destination: account-created
          requiredGroups: accounts-edge-account-created

The bit of code on the producer side that processes the request and sends the response:


I can debug and see that the request is received and processed, but when the response is sent to the reply channel, that's when the error occurs.

To get the @MessagingGateway working, what configurations and/or code am I missing? I know I'm combining Spring Integration and Spring Cloud Gateway, so I'm not sure if using them together is causing the issues.


  • It's good question and really good idea. But it isn't going to work so easy.

    First of all we have to determine for ourselves that gateway means request/reply, therefore correlation. And this available in @MessagingGateway via replyChannel header in face of TemporaryReplyChannel instance. Even if you have an explicit replyChannel = AccountChannels.ACCOUNT_CREATED, the correlation is done only via the mentioned header and its value. The fact that this TemporaryReplyChannel is not serializable and can't be transferred over the network to the consumer on another side.

    Luckily Spring Integration provide some solution for us. It is a part of the HeaderEnricher and its headerChannelsToString option behind HeaderChannelRegistry:

    Starting with Spring Integration 3.0, a new sub-element <int:header-channels-to-string/> is available; it has no attributes. This converts existing replyChannel and errorChannel headers (when they are a MessageChannel) to a String and stores the channel(s) in a registry for later resolution when it is time to send a reply, or handle an error. This is useful for cases where the headers might be lost; for example when serializing a message into a message store or when transporting the message over JMS. If the header does not already exist, or it is not a MessageChannel, no changes are made.


    But in this case you have to introduce an internal channel from the gateway to the HeaderEnricher and only the last one will send the message to the AccountChannels.CREATE_ACCOUNT_REQUEST. So, the replyChannel header will be converted to a string representation and be able to travel over the network. On the consumer side when you send a reply you should ensure that you transfer that replyChannel header as well, as it is. So, when the message will arrive to the AccountChannels.ACCOUNT_CREATED on the producer side, where we have that @MessagingGateway, the correlation mechanism is able to convert a channel identificator to the proper TemporaryReplyChannel and correlate the reply to the waiting gateway call.

    Only the problem here that your producer application must be as single consumer in the group for the AccountChannels.ACCOUNT_CREATED - we have to ensure that only one instance in the cloud is operating at a time. Just because only one instance has that TemporaryReplyChannel in its memory.

    More info about gateway: https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#gateway


    Some code for help:

    public interface AccountService {
      @Gateway(requestChannel = AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST, replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
      Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
    public IntegrationFlow headerEnricherFlow() {
       return IntegrationFlows.from(AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST)
                .enrichHeaders(headerEnricher -> headerEnricher.headerChannelsToString())


    Some simple application to demonstrate the PoC:

    @EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
    public class CloudStreamGatewayApplication {
        interface GatewayChannels {
            String REQUEST = "request";
            MessageChannel request();
            String REPLY = "reply";
            SubscribableChannel reply();
        private static final String ENRICH = "enrich";
        public interface StreamGateway {
            @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
            String process(String payload);
        public IntegrationFlow headerEnricherFlow() {
            return IntegrationFlows.from(ENRICH)
        public Message<?> process(Message<String> request) {
            return MessageBuilder.withPayload(request.getPayload().toUpperCase())
        public static void main(String[] args) {
            ConfigurableApplicationContext applicationContext =
                    SpringApplication.run(CloudStreamGatewayApplication.class, args);
            StreamGateway gateway = applicationContext.getBean(StreamGateway.class);
            String result = gateway.process("foo");

    The application.yml:

              destination: requests
              destination: replies
              destination: requests
              destination: replies

    I use spring-cloud-starter-stream-rabbit.



    Does the trick copying request headers to the reply message. So, the gateway is able on the reply side to convert channel identifier in the headers to the appropriate TemporaryReplyChannel to convey the reply properly to the caller of gateway.

    The SCSt issue on the matter: https://github.com/spring-cloud/spring-cloud-stream/issues/815