javamultithreadingredisspring-integrationaggregator

Deadlock using Aggregator + Redis


This post is related to this one

Spring integration deadlock using Aggregator + MessageStoreReaper + Redis?

but this message is too long to post. I continue with the original post

I upgraded to to the latest Java 7 build 1.7.0_60-b19 but the problem is still there.

I made another thread dump and found the same issue: all DefaultMessageListenerContainers (count 20) are locked by the taskScheduler (entityScheduler-3) in the AbstractCorrelatingMessageHandler lock invocation.

Here's the scheduler and aggregator config:

<task:scheduled-tasks scheduler="entityScheduler">
    <task:scheduled ref="entityReaperBean" method="run" fixed-rate="${entity.aggregator.timeout}" />
</task:scheduled-tasks>

<bean id="entityReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="entityAggregatorRedisMessageStore" />
    <property name="timeout" value="${entity.aggregator.reaper.timeout}" />
</bean>

<bean id="entityAggregatorRedisMessageStore"
    class="org.springframework.integration.redis.store.RedisMessageStore">
    <constructor-arg ref="entityRedisConnectionFactory" />
</bean>

<int:aggregator id="entityAggregator" send-partial-result-on-expiry="true"
        release-strategy-expression="size() >= ${transactions-receiver.batch_size}" correlation-strategy-expression="payload.entityCode"
        expire-groups-upon-completion="true" message-store="entityAggregatorRedisMessageStore" />

Here's the thread dump section:

"entityScheduler-3" - Thread t@188
 java.lang.Thread.State: WAITING
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <545079d6> (a    java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
 - locked <66d9180c> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)

"DefaultMessageListenerContainer-5" - Thread t@48
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- waiting to lock <66d9180c> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "entityScheduler-3" t@188
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:894)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:223)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy16.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy17.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy15.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy15.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:131)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy18.handleMessage(Unknown Source)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
at org.springframework.integration.jms.SubscribableJmsChannel$DispatchingMessageListener.onMessage(SubscribableJmsChannel.java:148)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1103)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1095)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:992)
at java.lang.Thread.run(Thread.java:745)

 Locked ownable synchronizers:
  - None

Is it possible that for some reason the handleMessageInternal method of the AbstractCorrelatingMessageHandler doesn't unlock? I use a Redis message store and see that the MsgStoreReaper also uses a lock. Could that be an issue?

thank you very much for your help here! Regards Guzman

UPDATE:

I found this messages in the log:

 2014-07-23 22:47:26,967 ERROR TaskUtils$LoggingErrorHandler:95 - Unexpected error occurred in scheduled task.
 2014-07-23 22:47:26,968 ERROR RedisMessageStore:122 - Exception in expiry callback
 2014-07-23 22:47:26,971 ERROR TaskUtils$LoggingErrorHandler:95 - Unexpected error occurred in scheduled task.

but no exception nor stacktrace. Does the TaskUtils error says something to you?


Solution

  • Actually, I do see one place...

            finally {
                if (removeGroup) {
                    this.remove(group);
                }
                lock.unlock();
            }
    

    ...if the message store throws an exception during the remove, we would skip the unlock - do you see anything in the log?