Using Spring-Integration
version 5.5.18
, I have a flow
which works well for small sets of inputs but which encounters java.lang.StackOverflowError
for larger sets due to recursive calls through a channel
and ending the recursion using a filter
. Are there any recommendations for ways of modifying the flow to be less apt to generate java.lang.StackOverflowError
? (I do know about requesting a larger thread stack for the JVM.) Is there perhaps a way of producing this iteratively with a DSL declaration, rather than recursively? Or perhaps this requires a MessageHandler
which performs the iteration in Java
outside of the Spring-Integration
Java-DSL
?
An example flow that exhibits the problem could be:
package org.example.filter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.*;
import org.springframework.messaging.Message;
import java.util.function.Consumer;
@Configuration
public class OverflowFlows {
private static final String CURRENT_ITERATION = "currentIteration";
private static final String ACCUMULATOR = "accumulator";
private static final String RECURSION_CHANNEL = "recurseToBodyValue";
@Bean
public IntegrationFlow callRecursiveFlow() {
return IntegrationFlows
.from("createsOverflow")
.enrichHeaders(h -> h.headerFunction(CURRENT_ITERATION, m -> 0))
.enrichHeaders(h -> h.headerFunction(ACCUMULATOR, m -> 0))
.gateway(callRecurseToBodyValueFlow)
.transform("headers.accumulator")
.get();
}
private final IntegrationFlow callRecurseToBodyValueFlow =
flow -> flow.channel(RECURSION_CHANNEL);
@Bean IntegrationFlow recurseToBodyValueFlow() {
return IntegrationFlows
.from(RECURSION_CHANNEL)
.enrichHeaders(h -> h.headerExpression(CURRENT_ITERATION,
"headers.currentIteration + 1",
true))
.filter("headers.currentIteration < payload", DISCARD_RETURNS_CURRENT_MESSAGE)
.enrichHeaders(h -> h.headerExpression(ACCUMULATOR,
"headers.accumulator + headers.currentIteration",
true))
.channel(RECURSION_CHANNEL)
.get();
}
public static final Consumer<FilterEndpointSpec> DISCARD_RETURNS_CURRENT_MESSAGE = discardReturnsCurrentMessage();
public static Consumer<FilterEndpointSpec> discardReturnsCurrentMessage() {
return filterSpec -> filterSpec.discardFlow(IntegrationFlowDefinition::bridge);
}
}
Used in conjunction with the declarations for the MessageChannel
and the MessagingGateway
:
package org.example.filter;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
@Component
public class OverflowMessageChannels {
@Bean(name = "createsOverflow")
public MessageChannel createsOverflow() {
return MessageChannels.direct().get();
}
}
package org.example.filter;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
@MessagingGateway(name = "overflowGateway", defaultReplyTimeout = "20")
public interface OverflowGateway {
@Gateway(requestChannel = "createsOverflow")
Integer createsOverflow(Integer value);
}
The 1000
iteration unit test will cause a stack overflow.
Any pointers on strategies will be appreciated.
import org.example.filter.OverflowGateway;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = org.example.filter.ApiConfiguration.class)
public class OverflowTest {
@Autowired
OverflowGateway overflowGateway;
@Test
public void call_with_one_returns_one() {
assertThat(overflowGateway.createsOverflow(1), equalTo(sumOfRange(1)));
}
@Test
public void call_with_20_returns_20() {
assertThat(overflowGateway.createsOverflow(20), equalTo(sumOfRange(20)));
}
@Test
public void call_with_1000_returns_1000() {
// causes java.lang.StackOverflowError
assertThat(overflowGateway.createsOverflow(1000), equalTo(sumOfRange(1000)));
}
private static int sumOfRange(final int topOfRange) {
return IntStream.range(1, topOfRange).sum();
}
}
I think this has nothing to do with Spring Integration. You simply can get StackOverflowError
with plain Java: What is the maximum depth of the java call stack?. Yes, Spring Integration adds some overhead with its messaging nature via channels and endpoints between them, but logically it doesn't matter: if your logic is recursive in the same thread, you eventually end up with the StackOverflowError
when add more steps into that loop.
You can break it, though, making that RECURSION_CHANNEL
as an ExecutorChannel
instance. This way you push every iteration of that loop into separate thread and call stack becomes not recursive.