spring-integrationspring-webfluxreactivedeclarative

Spring-Integration: how to inspect a http response within an AbstractRequestHandlerAdvice with webflux?


This question pertains directly to another question: Spring-Integration AbstractRequestHandlerAdvice with webflux/reactive: does this introduce a synchronous process?

The return result retrieved by the call from the advice handler to an http:outbound-gateway is either a Message<?> or a MessageBuilder (or perhaps only the latter).

public class MyLogger extends AbstractRequestHandlerAdvice {

   // ...

   @Override
   protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message( 
   {

      // ... logging before the call ...

      Object result = callback.execute();

      // ... logging after the call ...

      return result;
   }

}

When migrating to webflux and hence to webflux:outbound-gateway, the advice handler is now retrieving a MonoMapFuseable type in the result. Is it possible to read the information from MonoMapFuseable in order to log the return payload without consuming definitively the result? I'm a bit of a loss at how to do this.

Thanks for any insights.


Solution

  • I am not a reactor expert, but you should be able to call share() on the returned mono; then subscribe to that mono to get a copy of the result.

    /**
     * Prepare a {@link Mono} which shares this {@link Mono} result similar to {@link Flux#shareNext()}.
     * This will effectively turn this {@link Mono} into a hot task when the first
     * {@link Subscriber} subscribes using {@link #subscribe()} API. Further {@link Subscriber} will share the same {@link Subscription}
     * and therefore the same result.
     * It's worth noting this is an un-cancellable {@link Subscription}.
     * <p>
     * <img class="marble" src="doc-files/marbles/shareForMono.svg" alt="">
     *
     * @return a new {@link Mono}
     */
    public final Mono<T> share() {
    

    I just tried it with this...

    @Bean
    ApplicationRunner runner() {
        return args -> {
            One<Object> one = Sinks.one();
            Mono<Object> mono = one.asMono();
            Mono<Object> mono1 = mono.share();
            Mono<Object> mono2 = mono.share();
            subsMono(mono1, 1);
            subsMono(mono2, 2);
            subsMono(mono, 0);
            one.emitValue("foo", null);
        };
    }
    
    private void subsMono(Mono mono, int i) {
        mono.doOnNext(obj -> System.out.println(obj.toString() + i))
            .subscribe();
    }
    
    foo1
    foo2
    foo0