javaspring-integrationapi-gatewayspring-messaging

How to implement thread efficient retries on async JmsOutboundGateway


In my case i use JmsOutboundGateway for mq connection - inputChannel is ExecutorChannel.

Currently ServiceActivator don't have any output channel. Gateway interface method returns CompletableFuture object.

@MessagingGateway
public interface SimpleGateway {

@gateway(requestChannel = "mqRequestChannel")
CompletableFuture sendAndReceiveMqMessage(String message);
}

configuration class

@bean
public MessageChannel mqRequestChannel(AsyncTaskExecutor taskExecutor) {
return new ExecutorChannel(taskExecutor);
}

@ServiceActivator(inputChannel = "mqRequestChannel")
@Bean
public JmsOutboundGateway jmsOutboundGateway(... ) {
    JmsOutboundGateway gateway = new JmsOutboundGateway();
    gateway.setConnectionFactory(mqConnectionFactory);
    gateway.setRequestDestinationName(requestDestination);
    gateway.setReplyDestinationName(responseDestination);
    gateway.setAsync(true);
     ....
     ....
    return gateway;
}
@Autowired
private SimpleGateway simpleGateway;

 CompletableFuture<String> mqResponse = simpleGateway.sendAndReceiveMqMessage("sdsadas");

I tried to use recursion but it won't be best solution.

    private void callGateway(int retry, int requestNum){

        String dummyOfs = "dsfsdf";

            CompletableFuture<String> response = simpleGateway.sendAndReceiveMqMessage(dummyOfs);

        nCoreResponse.exceptionally(throwable -> { LOG.info("error occured");
        LOG.info("[Error section CURRENT THREAD in gateway class call ]: {} , requestNum = {}, retry = {}", Thread.currentThread(), requestNum, retry );
        if(MAX_RETRY > retry){
            Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
            callGateway(retry + 1, requestNum);
        }

        return null;});

        response.thenAccept((String someString) ->
    {
        LOG.info("[success ]: {} , returned value: {}", Thread.currentThread(), someString);
    });
    };
}

I was also thinking of pushing retries to a different thread pool.

But for me is important to set up numer of retries and back off time and also if timeout for request is reached then retries won't be triggered or stopped.


Solution

  • There is no retry API around CompletableFuture. At least as far as I'm aware. According to your gateway.setAsync(true) and ExecutorChannel upfront there is no connection between your CompletableFuture return from a messaging gateway and that reply from a JMS. The JmsOutboundGateway will still produce a real result from its call to the replyChannel header after completion of the Future, and only after that it is wrapped to another CompletableFuture for your messaging gateway expectations. An ExecutorChannel is also a bit of an overhead since a CompletableFuture return type from a messaging gateway method uses a TaskExecutor for async request reply. Therefore you do have a lot of thread switching in between request and reply.

    It might not be what you would expect, but I can suggest to change your sendAndReceiveMqMessage() to return a Mono. And already there you can use a retry() API. For downstream flow it doesn't matter and it still going to be an async. In the end, the successful result of this Mono can be converted to the CompletableFuture (see Mono.toFuture()) if that is what required in the consumer of your API.