I need to push messages to external rabbitmq. My java configuration successfully declares queue to push, but every time I try to push, I have next exception:
web_1 | com.rabbitmq.client.AlreadyClosedException: connection is already closed due to clean connection shutdown; protocol method: #method<connection.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
web_1 | at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
web_1 | at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
web_1 | at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710)
web_1 | at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685)
web_1 | at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675)
web_1 | at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:207)
web_1 | at com.ruddi.logiweb.service.impl.MQServiceImpl.send(MQServiceImpl.java:33)
web_1 | at com.ruddi.logiweb.controller.DriverController.addDriver(DriverController.java:105)
web_1 | at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
web_1 | at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
web_1 | at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
web_1 | at java.base/java.lang.reflect.Method.invoke(Method.java:566)
web_1 | at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)
web_1 | at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)
web_1 | at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)
web_1 | at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)
web_1 | at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
web_1 | at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
web_1 | at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060)
web_1 | at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962)
web_1 | at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
web_1 | at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
web_1 | at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
web_1 | at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
web_1 | at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1 | at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)
web_1 | at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
web_1 | at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:118)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:158)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.header.HeaderWriterFilter.doHeadersAfter(HeaderWriterFilter.java:92)
web_1 | at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:77)
web_1 | at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
web_1 | at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)
web_1 | at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
web_1 | at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:358)
web_1 | at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:271)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1 | at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
web_1 | at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
web_1 | at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
web_1 | at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
web_1 | at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
web_1 | at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:687)
web_1 | at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
web_1 | at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
web_1 | at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
web_1 | at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
web_1 | at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
web_1 | at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)
web_1 | at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
web_1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
web_1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
web_1 | at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
web_1 | at java.base/java.lang.Thread.run(Thread.java:829)
Bean:
@Bean("mqChannel")
public Channel channel() {
ConnectionFactory factory = new ConnectionFactory();
String uri = "amqp://sxtswmgm:dh1N5aBEUnam53urt2VMrF9HSi7IDWAf@stingray.rmq.cloudamqp.com/sxtswmgm";
try {
factory.setUri(uri);
}
catch (Exception e) {
e.printStackTrace();
}
// factory.setHost("rabbitmq");
// factory.setPort(5672);
// factory.setUsername("guest");
// factory.setPassword("guest");
Channel channel = null;
try (Connection connection = factory.newConnection()){
channel = connection.createChannel();
channel.queueDeclare("logiweb", false, false, false, null);
}
catch(Exception e) {
log.info("EXCEPTION IN BEAN");
e.printStackTrace();
}
return channel;
}
Sending message service class:
@Service
@Slf4j
public class MQServiceImpl implements MQService {
Channel channel;
@Autowired
public MQServiceImpl(
@Qualifier("mqChannel") Channel channel) {
this.channel = channel;
}
/**
* sending message to mq
* @param message message to send
*/
@Override
public void send(String message) {
log.info("send(message) method was called");
try {
channel.basicPublish("", "logiweb", null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}
By the way, when I start rabbitmq locally it works perfectly. This problem appears when I use containerized rabbitmq or external cloudamqp
So, one more time, when I run my app I can see new queue in RabbitMQ dashboard (it was declared), but every try to push the message ends with exception.
I'm struggling to understand how that code fits together, but this part strikes me as definitely wrong:
try (Connection connection = factory.newConnection()){
channel = connection.createChannel();
channel.queueDeclare("logiweb", false, false, false, null);
}
When the try
block completes, the connection
resource is going to be auto-closed. However the javadoc for Connection.close()
states:
"Close this connection and all its channels with the AMQP.REPLY_SUCCESS close code and message 'OK'. Waits for all the close operations to complete."
So, the Channel
that you have created will be closed before the channel()
method returns it.