spring-integrationspring-integration-dsl

Why does Spring not clean up IntegrationFlows for a dynamic TCP Client


Problem: Spring app is running out of heap memory because Integration Flow objects are filling up space without being garbage collected.

I am using Spring Integration to dynamically create TCP clients with inbound and outbound adapters. These clients connect to a third-party application and may only send, only receive, or send and receive, then disconnect. We started seeing an out of heap memory error after the application runs for a while. Looking at the memory debugger, I can see that the IntegrationFlow-related objects are not being cleaned up after calling flowContext.remove(flowId).

@Service
public class TcpConnectionManager {
    @Autowired
    IntegrationFlowContext context;

    // IntegrationTcpConnection is a custom class for storing connection data
    public IntegrationTcpConnection createConnection(String host, int port) {
        String baseId = host + port + UUID.randomUUID();
        TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
        
        MessageChannel send = new DirectChannel();
        IntegrationFlow out = IntegrationFlow.from(send).handle(Tcp.outboundAdapter(cf)).get();
        
        //I've also tried this as an alternative to the above ^
        //IntegrationFlow out = f -> f.handle(Tcp.outboundAdapter(cf));
        // Then when registering, return the messaging template to send data

        context.registration(out).addBean(cf).id(baseId + ".out").register();
        
        PollableChannel receive = new QueueChannel();
        IntegrationFlow in = IntegrationFlow.from(Tcp.inboundAdapter(cf)).channel(receive).get();
        context.registration(in).id(baseId + ".in").register();

        // Store references to the input and output channels in this object to be used elsewhere
        return new IntegrationTcpConnection(baseId, send, receive, cf);
    }

    // When business logic is done with the connection, it calls this method and the connection ends
    public void removeFlow(IntegrationTcpConnection connection)
    {
        context.remove(connection.getBaseId() + ".in");
        context.remove(connection.getBaseId() + ".out");
    }

}

// Class for storing connection channels for sending and receiving
@AllArgsConstructor
public class IntegrationTcpConnection {

    String baseId;
    MessageChannel send;
    PollableChannel receive;
    TcpNetClientConnectionFactory cf;

    public void send(String msg)
    {
        send.send(MessageBuilder.withPayload(msg).build());
    }

    public String receive(long timeout)
    {
        return (String) Objects.requireNonNull(receive.receive(timeout)).getPayload();
    }
}

// A test endpoint I am hitting to test sending a message and then removing the flow
@RestController
public class TestController {
    @Autowired
    TcpConnectionManager manager;

    @GetMapping("/test")
    public String test()
    {
        for (int i = 0; i < 100; i++)
        {
            IntegrationTcpConnection con = manager.createConnection("localhost", 30000);
            con.send("test message");
            manager.removeFlow();
        }
        
        return "Success";
    }

}

To test this, I have a small app with an Integration Tcp Server running that logs the received messages. The client successfully connects, sends a message, and disconnects 100 times based on the for loop. However, using the memory debugger, I can see 100 IntegrationFlow-related objects (MessagingTemplate, EventDrivenConsumer, DirectChannel, etc...) even after the removeFlow() method gets called.

Hitting this endpoint enough times will result in a out of heap memory error because the IntegrationFlows are not being removed.

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space

I didn't think there was anything wrong with how I was registering my flows as I based it off an example I found online, but I could be mistaken (Spring Integration Java DSL TCP outbound and Inbound Connection Handler)

EDIT

I have done some testing with the dynamic TCP sample (https://github.com/spring-projects/spring-integration-samples/tree/main/advanced/dynamic-tcp-client) and was able to reproduce the out of memory error.

I had to make some modifications to the sample so that a new flow is created every time and flows always have a unique ID when registered. I still see the debug log saying that the TcpNetClientConnectionFactory bean was stopped, but when viewing the heap memory in IntelliJ I can see hundreds of TcpNetClientConnectionFactories still in memory.

Below are the edits I made to the dynamic TCP sample. To test this I used a for loop to send 100 messages and remove the flows.

        // Test method for removing a single flow manually
        public void removeFlow() {
            String key = subFlows.entrySet().iterator().next().getKey();
            subFlows.remove(key);
            this.flowContext.remove(key + ".flow");
        }


        @Override
        protected synchronized Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            //MessageChannel channel = this.subFlows
            //      .get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port"));
            //if (channel == null) {
            //  channel = createNewSubflow(message);
            //}
            MessageChannel channel = createNewSubflow(message);
            return Collections.singletonList(channel);
        }

        private MessageChannel createNewSubflow(Message<?> message) {
            String host = (String) message.getHeaders().get("host");
            Integer port = (Integer) message.getHeaders().get("port");
            Assert.state(host != null && port != null, "host and/or port header missing");

            //String hostPort = host + port;
            // Add a UUID to the hostPort so a new flow is always created
            String hostPort = host + port + UUID.randomUUID();

            TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
            TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
            handler.setConnectionFactory(cf);
            IntegrationFlow flow = f -> f.handle(handler);
            IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
                    this.flowContext.registration(flow)
                            .addBean(cf)
                            .id(hostPort + ".flow")
                            .register();
            MessageChannel inputChannel = flowRegistration.getInputChannel();
            this.subFlows.put(hostPort, inputChannel);
            return inputChannel;
        }

@RestController
public class TestController {
    @Autowired
    TcpRouter router;

    @GetMapping("/test")
    public String test()
    {
        for (int i = 0; i < 100; i++)
        {
            toTCP.send("Test Message", "localhost", 40000);
            router.removeFlow();
        }

        // Set a breakpoint here and view the heap memory contents to see IntegrationFlows   
        return "Success";
    }

}

EDIT 2

I believe I have identified the source of the problem as using UUIDs in the flow id when registering. Without the UUID added to the flow id, my original code works without filling up the heap.

I'm assuming Spring is reusing the IntegrationFlow components in the background since they have the same ID when just using host + port.

My original purpose for adding the UUID was to ensure a new client connection IntegrationFlow would be created even if two requests to the same server arrived at the same time since the third-party software requires a completely new connection per request-response.


Solution

  • I think I see where is that. AbstractBeanFactory.mergedBeanDefinitions. Does not look like it is cleared at all at runtime when we remove the bean. I'll look into that later. For now indeed a workaround is to reuse a bean name as much as possible. This way it is going to be just renewed in that mergedBeanDefinitions cache until application shutdown.

    Another workaround is to rely on the auto-generated bean name as it is there by default:

            if (flowId == null) {
                flowId = generateBeanName(integrationFlow, null);
                builder.id(flowId);
            }
    

    The fix in Spring Framework which prevents us from using random bean names: https://github.com/spring-projects/spring-framework/issues/23336