springspring-integrationspring-cloud-function

Why does Spring Cloud Function invoke the first part of a composed function twice?


I have built a Spring Cloud Function application with a Spring Integration flow exposed as a function (actually, an imperative Consumer) and a reactive Function. Both functions are composed and exposed as a Spring Cloud Function standalone web application, which takes POST requests as the function's input.

The application works as expected, except for one thing: the first function is invoked twice by the framework (as I can see at the logs), even though the Consumer is invoked only once (which I have confirmed with a @SpringBootTest).

This is what the app looks like.

FunctionCompositionApplication.java:

@SpringBootApplication
public class FunctionCompositionApplication {

  private static final Logger LOGGER = LoggerFactory.getLogger(FunctionCompositionApplication.class);

  public static void main(final String[] args) {
    SpringApplication.run(FunctionCompositionApplication.class, args);
  }

  // just for testing purposes
  public static class MyCustomMessageHandler {
    public void handleMessage(final Message<String> message) {
      LOGGER.info("Will handle a message: {}", message);
    }
  }

  @Bean
  MyCustomMessageHandler myCustomMessageHandler() {
    return new MyCustomMessageHandler();
  }

  @Bean
  Function<Flux<Message<String>>, Flux<Message<String>>> uppercase() {
    return flux -> flux.map(message -> {
      final var payload = message.getPayload();
      LOGGER.info("Will transform this payload to upper case: {}", payload);
      return MessageBuilder
          .withPayload(payload.toUpperCase())
          .copyHeaders(message.getHeaders())
          .build();
    });
  }

  @Bean
  IntegrationFlow loggingFlow(final MyCustomMessageHandler handler) {
    return flow -> flow.handle(handler);
  }

  @Bean
  @SuppressWarnings({"rawtypes", "unchecked"})
  AnnotationGatewayProxyFactoryBean<Consumer<Message<?>>> log() {
    final var gatewayProxyFactoryBean = new AnnotationGatewayProxyFactoryBean<>(Consumer.class);
    gatewayProxyFactoryBean.setDefaultRequestChannelName("loggingFlow.input");
    return (AnnotationGatewayProxyFactoryBean) gatewayProxyFactoryBean;
  }

}

application.properties:

spring.application.name=function-composition-test
spring.cloud.function.definition=uppercase|log

FunctionCompositionApplicationTests.java:

@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@ActiveProfiles("test")
class FunctionCompositionApplicationTests {

  @Autowired
  private TestRestTemplate restTemplate;
  @MockitoBean
  private MyCustomMessageHandler handler;

  @Test
  @DisplayName("Should transform the payload to upper case")
  void transformToUpperCase() throws Exception {
    final var payload = "functional composition test";

    this.restTemplate.exchange(
        RequestEntity
            .post(new URI("/test/uppercase,log"))
            .contentType(MediaType.TEXT_PLAIN)
            .body(payload),
        Void.class);

    verify(handler, Mockito.times(1)).handleMessage(Mockito.any());
  }

}

application-test.properties:

spring.cloud.function.web.path=/test

pom.xml: (showing only the dependencies of a regular Spring Boot 3.5.5 application)

<properties>
    <java.version>21</java.version>
    <spring-cloud.version>2025.0.0</spring-cloud.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-function-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-function-context</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

The test passes (so the Consumer is actually invoked only once), but this is what I see at the logs:

2025-08-27T15:05:23.743-03:00  INFO 147213 --- [function-composition-test] [ux-http-epoll-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this payload to upper case: functional composition test
2025-08-27T15:05:23.754-03:00  INFO 147213 --- [function-composition-test] [ux-http-epoll-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this payload to upper case: functional composition test

I'm not sure how to (or if I should or could) turn that Consumer into a reactive one... Can this mix of reactive and imperative styles be related to this issue?

Anyway, running the first function twice is not such a big deal, since the actual Consumer is invoked only once (which prevents problems in my real-world scenarios). Still, I'd like to understand why this is happening, in order to fix my configuration (maybe by turning the Consumer into a reactive one?) and avoid the redundant execution of the first function.


Solution

  • Thank you for the sample project to play with! Very helpful to diagnose!

    So, after making this change into your code:

    LOGGER.info("Will transform this message's payload to upper case: {}", message);
    

    I see this in logs:

    2025-08-28T12:18:26.201-04:00  INFO 37308 --- [function-composition-test] [flux-http-nio-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this message's payload to upper case: GenericMessage [payload=functional composition test, headers={Accept=application/json, application/*+json, host=localhost:57957, id=c2e496dc-90e1-bbca-31ea-bc890f9de5bd, Content-Length=27, uri=http://localhost:57957/test/uppercase,log, accept-encoding=gzip, user-agent=ReactorNetty/1.2.9, Content-Type=text/plain, timestamp=1756397906201}]
    2025-08-28T12:18:26.223-04:00  INFO 37308 --- [function-composition-test] [flux-http-nio-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this message's payload to upper case: GenericMessage [payload=functional composition test, headers={Accept=application/json, application/*+json, host=localhost:57957, id=919af0da-5461-b6ab-ddaf-40cd2b77e80a, Content-Length=27, uri=http://localhost:57957/test/uppercase,log, accept-encoding=gzip, user-agent=ReactorNetty/1.2.9, Content-Type=text/plain, timestamp=1756397906223}]
    

    That means that we have two different messages with the same payload.

    After some debugging, I see that we have subscribe to the input Flux twice:

    1. SimpleFunctionRegistry.ConsumerWrapper:

       public void accept(Flux messageFlux) {
           messageFlux.doOnNext(this.targetConsumer).subscribe();
       }
      
    2. FunctionWebRequestProcessingHelper:

       Object result = function.apply(inputMessage);
       if (function.isConsumer()) {
           if (result instanceof Publisher) {
               Mono.from((Publisher) result).subscribe();
           }
           return "DELETE".equals(wrapper.getMethod()) ?
                   Mono.empty() : Mono.just(ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers, ignoredHeaders, requestOnlyHeaders)).build());
       }
      

    There is some disconnection between return from the reactive function call and that supplier invocation:

                    result = fluxInput
                            .transform(flux -> {
                                flux =  Flux.from((Publisher) flux).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v));
                                ((Consumer) this.target).accept(flux);
                                return Mono.ignoreElements((Flux) flux);
                            }).then();
    

    That's, probably, why we don't see a a double processing on the consumer side, but the flux in the function is still called twice because of those two subscriptions.

    Feels like a bug somewhere in the SimpleFunctionRegistry.invokeConsumer().

    Please, raise a GH issue for Spring Cloud Function project.

    Unfortunately, I don't see any workaround from my side.