This post is related to this one
Spring integration deadlock using Aggregator + MessageStoreReaper + Redis?
but this message is too long to post. I continue with the original post
I upgraded to to the latest Java 7 build 1.7.0_60-b19 but the problem is still there.
I made another thread dump and found the same issue: all DefaultMessageListenerContainers (count 20) are locked by the taskScheduler (entityScheduler-3) in the AbstractCorrelatingMessageHandler lock invocation.
Here's the scheduler and aggregator config:
<task:scheduled-tasks scheduler="entityScheduler">
<task:scheduled ref="entityReaperBean" method="run" fixed-rate="${entity.aggregator.timeout}" />
</task:scheduled-tasks>
<bean id="entityReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="entityAggregatorRedisMessageStore" />
<property name="timeout" value="${entity.aggregator.reaper.timeout}" />
</bean>
<bean id="entityAggregatorRedisMessageStore"
class="org.springframework.integration.redis.store.RedisMessageStore">
<constructor-arg ref="entityRedisConnectionFactory" />
</bean>
<int:aggregator id="entityAggregator" send-partial-result-on-expiry="true"
release-strategy-expression="size() >= ${transactions-receiver.batch_size}" correlation-strategy-expression="payload.entityCode"
expire-groups-upon-completion="true" message-store="entityAggregatorRedisMessageStore" />
Here's the thread dump section:
"entityScheduler-3" - Thread t@188
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <545079d6> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- locked <66d9180c> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
"DefaultMessageListenerContainer-5" - Thread t@48
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- waiting to lock <66d9180c> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "entityScheduler-3" t@188
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:894)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:223)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy16.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy17.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy15.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy15.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:131)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy18.handleMessage(Unknown Source)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
at org.springframework.integration.jms.SubscribableJmsChannel$DispatchingMessageListener.onMessage(SubscribableJmsChannel.java:148)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1103)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1095)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:992)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
Is it possible that for some reason the handleMessageInternal method of the AbstractCorrelatingMessageHandler doesn't unlock? I use a Redis message store and see that the MsgStoreReaper also uses a lock. Could that be an issue?
thank you very much for your help here! Regards Guzman
UPDATE:
I found this messages in the log:
2014-07-23 22:47:26,967 ERROR TaskUtils$LoggingErrorHandler:95 - Unexpected error occurred in scheduled task.
2014-07-23 22:47:26,968 ERROR RedisMessageStore:122 - Exception in expiry callback
2014-07-23 22:47:26,971 ERROR TaskUtils$LoggingErrorHandler:95 - Unexpected error occurred in scheduled task.
but no exception nor stacktrace. Does the TaskUtils error says something to you?
Actually, I do see one place...
finally {
if (removeGroup) {
this.remove(group);
}
lock.unlock();
}
...if the message store throws an exception during the remove, we would skip the unlock - do you see anything in the log?