spring-integrationspring-integration-dslspring-integration-amqp

spring integration amqp listener transacted


What is difference between acknowledgeMode.AUTO and acknowledgeMode.MANUAL when is channelTransacted = true on message listener container?

I understand that when I want to use acknowledgeMode.MANUAL with channelTransacted=false, the listener must acknowledge message by calling Channel.basicAck(). Is it unnecessary use acknowledgeMode.MANUAL when listener container has channelTransacted = true? Because when I combine acknowledgeMode.MANUAL and channelTransacted=true with calling Channel.basicAck() I get exception.

@Bean(name = "amqpInboundEsignRequest")
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) {
    return IntegrationFlows.from(
            Amqp.inboundAdapter(connectionFactory, EsignContstants.QUEUE)
                    .acknowledgeMode(AcknowledgeMode.MANUAL)
                    .messageConverter(new Jackson2JsonMessageConverter())
                    .autoStartup(false)
                    .channelTransacted(true)
                    .transactionManager(transactionManager)
                    .txSize(1)
    )
            .enrichHeaders(s -> s.header(MessageHeaders.REPLY_CHANNEL, "basicAck.flow", true))
            .log("amqpInbound.start-process")
            .channel("insertAndStartProcess.input")
            .get();
}

exception

2018-02-19 09:57:27.478 ERROR 4664 --- [rContainer#0-45] o.s.t.s.TransactionSynchronizationUtils  : TransactionSynchronization.afterCompletion threw exception

org.springframework.amqp.AmqpException: failed to commit RabbitMQ transaction
    at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:168) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$RabbitResourceSynchronization.afterCompletion(ConnectionFactoryUtils.java:264) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCompletion(TransactionSynchronizationUtils.java:168) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.invokeAfterCompletion(AbstractPlatformTransactionManager.java:1002) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCompletion(AbstractPlatformTransactionManager.java:977) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:806) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:730) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1198) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:102) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1318) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:50) ~[amqp-client-4.0.3.jar:4.0.3]
    at sun.reflect.GeneratedMethodAccessor151.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:980) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at com.sun.proxy.$Proxy191.txCommit(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:164) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    ... 11 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.3.jar:4.0.3]
    ... 19 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572) ~[amqp-client-4.0.3.jar:4.0.3]
    ... 1 common frames omitted

Solution

  • Using transactions shouldn't make any difference; that error implies the channel was closed for some reason between when you received the message and sent the basicAck, with transactions we won't detect it until the transaction commits.

    We recently added a fix to detect the channel was closed and reject the basicAck in that case.

    The fix is in 2.0.2.RELEASE.