javarabbitmqcloudamqp

Java RabbitMQ connection is already closed


I need to push messages to external rabbitmq. My java configuration successfully declares queue to push, but every time I try to push, I have next exception:

web_1       | com.rabbitmq.client.AlreadyClosedException: connection is already closed due to clean connection shutdown; protocol method: #method<connection.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
web_1       |   at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
web_1       |   at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
web_1       |   at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710)
web_1       |   at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685)
web_1       |   at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675)
web_1       |   at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:207)
web_1       |   at com.ruddi.logiweb.service.impl.MQServiceImpl.send(MQServiceImpl.java:33)
web_1       |   at com.ruddi.logiweb.controller.DriverController.addDriver(DriverController.java:105)
web_1       |   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
web_1       |   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
web_1       |   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
web_1       |   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
web_1       |   at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)
web_1       |   at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)
web_1       |   at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)
web_1       |   at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)
web_1       |   at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
web_1       |   at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
web_1       |   at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060)
web_1       |   at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962)
web_1       |   at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
web_1       |   at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
web_1       |   at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
web_1       |   at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
web_1       |   at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1       |   at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)
web_1       |   at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
web_1       |   at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:118)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:158)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.header.HeaderWriterFilter.doHeadersAfter(HeaderWriterFilter.java:92)
web_1       |   at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:77)
web_1       |   at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
web_1       |   at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)
web_1       |   at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
web_1       |   at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:358)
web_1       |   at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:271)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1       |   at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
web_1       |   at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
web_1       |   at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
web_1       |   at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
web_1       |   at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
web_1       |   at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:687)
web_1       |   at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
web_1       |   at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
web_1       |   at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
web_1       |   at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
web_1       |   at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
web_1       |   at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)
web_1       |   at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
web_1       |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
web_1       |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
web_1       |   at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
web_1       |   at java.base/java.lang.Thread.run(Thread.java:829)

Bean:

@Bean("mqChannel")
    public Channel channel() {
        ConnectionFactory factory = new ConnectionFactory();

        String uri = "amqp://sxtswmgm:dh1N5aBEUnam53urt2VMrF9HSi7IDWAf@stingray.rmq.cloudamqp.com/sxtswmgm";

        try {
            factory.setUri(uri);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
//        factory.setHost("rabbitmq");
//        factory.setPort(5672);
//        factory.setUsername("guest");
//        factory.setPassword("guest");

        Channel channel = null;

        try (Connection connection = factory.newConnection()){
            channel = connection.createChannel();
            channel.queueDeclare("logiweb", false, false, false, null);
        }
        catch(Exception e) {
            log.info("EXCEPTION IN BEAN");
            e.printStackTrace();
        }

        return channel;
    }

Sending message service class:

@Service
@Slf4j
public class MQServiceImpl implements MQService {
    Channel channel;

    @Autowired
    public MQServiceImpl(
            @Qualifier("mqChannel") Channel channel) {
        this.channel = channel;
    }

    /**
     * sending message to mq
     * @param message message to send
     */
    @Override
    public void send(String message) {
        log.info("send(message) method was called");
        try {
            channel.basicPublish("", "logiweb", null, message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

By the way, when I start rabbitmq locally it works perfectly. This problem appears when I use containerized rabbitmq or external cloudamqp

So, one more time, when I run my app I can see new queue in RabbitMQ dashboard (it was declared), but every try to push the message ends with exception.


Solution

  • I'm struggling to understand how that code fits together, but this part strikes me as definitely wrong:

        try (Connection connection = factory.newConnection()){
            channel = connection.createChannel();
            channel.queueDeclare("logiweb", false, false, false, null);
        }
    

    When the try block completes, the connection resource is going to be auto-closed. However the javadoc for Connection.close() states:

    "Close this connection and all its channels with the AMQP.REPLY_SUCCESS close code and message 'OK'. Waits for all the close operations to complete."

    So, the Channel that you have created will be closed before the channel() method returns it.