I have built a Spring Cloud Function application with a Spring Integration flow exposed as a function (actually, an imperative Consumer
) and a reactive Function
. Both functions are composed and exposed as a Spring Cloud Function standalone web application, which takes POST requests as the function's input.
The application works as expected, except for one thing: the first function is invoked twice by the framework (as I can see at the logs), even though the Consumer
is invoked only once (which I have confirmed with a @SpringBootTest
).
This is what the app looks like.
FunctionCompositionApplication.java:
@SpringBootApplication
public class FunctionCompositionApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(FunctionCompositionApplication.class);
public static void main(final String[] args) {
SpringApplication.run(FunctionCompositionApplication.class, args);
}
// just for testing purposes
public static class MyCustomMessageHandler {
public void handleMessage(final Message<String> message) {
LOGGER.info("Will handle a message: {}", message);
}
}
@Bean
MyCustomMessageHandler myCustomMessageHandler() {
return new MyCustomMessageHandler();
}
@Bean
Function<Flux<Message<String>>, Flux<Message<String>>> uppercase() {
return flux -> flux.map(message -> {
final var payload = message.getPayload();
LOGGER.info("Will transform this payload to upper case: {}", payload);
return MessageBuilder
.withPayload(payload.toUpperCase())
.copyHeaders(message.getHeaders())
.build();
});
}
@Bean
IntegrationFlow loggingFlow(final MyCustomMessageHandler handler) {
return flow -> flow.handle(handler);
}
@Bean
@SuppressWarnings({"rawtypes", "unchecked"})
AnnotationGatewayProxyFactoryBean<Consumer<Message<?>>> log() {
final var gatewayProxyFactoryBean = new AnnotationGatewayProxyFactoryBean<>(Consumer.class);
gatewayProxyFactoryBean.setDefaultRequestChannelName("loggingFlow.input");
return (AnnotationGatewayProxyFactoryBean) gatewayProxyFactoryBean;
}
}
application.properties:
spring.application.name=function-composition-test
spring.cloud.function.definition=uppercase|log
FunctionCompositionApplicationTests.java:
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@ActiveProfiles("test")
class FunctionCompositionApplicationTests {
@Autowired
private TestRestTemplate restTemplate;
@MockitoBean
private MyCustomMessageHandler handler;
@Test
@DisplayName("Should transform the payload to upper case")
void transformToUpperCase() throws Exception {
final var payload = "functional composition test";
this.restTemplate.exchange(
RequestEntity
.post(new URI("/test/uppercase,log"))
.contentType(MediaType.TEXT_PLAIN)
.body(payload),
Void.class);
verify(handler, Mockito.times(1)).handleMessage(Mockito.any());
}
}
application-test.properties:
spring.cloud.function.web.path=/test
pom.xml: (showing only the dependencies of a regular Spring Boot 3.5.5 application)
<properties>
<java.version>21</java.version>
<spring-cloud.version>2025.0.0</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-function-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
The test passes (so the Consumer
is actually invoked only once), but this is what I see at the logs:
2025-08-27T15:05:23.743-03:00 INFO 147213 --- [function-composition-test] [ux-http-epoll-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this payload to upper case: functional composition test
2025-08-27T15:05:23.754-03:00 INFO 147213 --- [function-composition-test] [ux-http-epoll-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this payload to upper case: functional composition test
I'm not sure how to (or if I should or could) turn that Consumer
into a reactive one... Can this mix of reactive and imperative styles be related to this issue?
Anyway, running the first function twice is not such a big deal, since the actual Consumer
is invoked only once (which prevents problems in my real-world scenarios). Still, I'd like to understand why this is happening, in order to fix my configuration (maybe by turning the Consumer
into a reactive one?) and avoid the redundant execution of the first function.
Thank you for the sample project to play with! Very helpful to diagnose!
So, after making this change into your code:
LOGGER.info("Will transform this message's payload to upper case: {}", message);
I see this in logs:
2025-08-28T12:18:26.201-04:00 INFO 37308 --- [function-composition-test] [flux-http-nio-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this message's payload to upper case: GenericMessage [payload=functional composition test, headers={Accept=application/json, application/*+json, host=localhost:57957, id=c2e496dc-90e1-bbca-31ea-bc890f9de5bd, Content-Length=27, uri=http://localhost:57957/test/uppercase,log, accept-encoding=gzip, user-agent=ReactorNetty/1.2.9, Content-Type=text/plain, timestamp=1756397906201}]
2025-08-28T12:18:26.223-04:00 INFO 37308 --- [function-composition-test] [flux-http-nio-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this message's payload to upper case: GenericMessage [payload=functional composition test, headers={Accept=application/json, application/*+json, host=localhost:57957, id=919af0da-5461-b6ab-ddaf-40cd2b77e80a, Content-Length=27, uri=http://localhost:57957/test/uppercase,log, accept-encoding=gzip, user-agent=ReactorNetty/1.2.9, Content-Type=text/plain, timestamp=1756397906223}]
That means that we have two different messages with the same payload.
After some debugging, I see that we have subscribe to the input Flux
twice:
SimpleFunctionRegistry.ConsumerWrapper
:
public void accept(Flux messageFlux) {
messageFlux.doOnNext(this.targetConsumer).subscribe();
}
FunctionWebRequestProcessingHelper
:
Object result = function.apply(inputMessage);
if (function.isConsumer()) {
if (result instanceof Publisher) {
Mono.from((Publisher) result).subscribe();
}
return "DELETE".equals(wrapper.getMethod()) ?
Mono.empty() : Mono.just(ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers, ignoredHeaders, requestOnlyHeaders)).build());
}
There is some disconnection between return from the reactive function call and that supplier invocation:
result = fluxInput
.transform(flux -> {
flux = Flux.from((Publisher) flux).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v));
((Consumer) this.target).accept(flux);
return Mono.ignoreElements((Flux) flux);
}).then();
That's, probably, why we don't see a a double processing on the consumer side, but the flux in the function is still called twice because of those two subscriptions.
Feels like a bug somewhere in the SimpleFunctionRegistry.invokeConsumer()
.
Please, raise a GH issue for Spring Cloud Function project.
Unfortunately, I don't see any workaround from my side.