I want to use Spring amqp to synchronously send and wait for messages to arrive in a RabbitMQ queue. If the message does not arrive in the queue for any reason (ex. queue does not exist) an exception should occur.
The design behind this is that we have 2 applications, application A that receives job requests from a UI and sends them to application B. A saves records of those jobs in its DB so the user can review them. Application B is usually turned off and occasionally turned on by the user to consume jobs from the queue. It's important that the jobs make it to the queue; we would like to fail-fast and immediately tell the user the job was not enqueued.
I thought this was possible using RabbitTemplate.channelTransacted(true) and @Transactional on my publishing code. I tested it out by publishing to a queue that does not exist on the broker. Unfortunately the code proceeds without any exceptions.
Below is the very simplified code for Application A. It's setup to create the queue for Application B in case B hasn't been started yet. This queue-creation is commented out to test the "required delivery" behavior.
Am I missing some configuration? Am I misunderstanding how Spring amqp transactions work? Or should this be working? (If the latter, then I'll make a small github project to reproduce the issue).
package app;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class RequestPublisher {
private final RabbitTemplate rabbitTemplate;
public RequestPublisher(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/* Invoked by a @Controller */
@Transactional
public void request(byte[] messageBody) {
rabbitTemplate.send("requests", new Message(messageBody));
System.out.println("Additional logic that should not be performed unless message reached the queue");
}
}
package config;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate transactionalRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
/*
// Normally commented-in, but commented-out to test transactions
@Bean
public Queue requestQueue() {
return new Queue("requests");
}
*/
}
package app;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@SpringBootApplication
@EnableTransactionManagement
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
application.yml:
spring:
rabbitmq:
host: rabbitmq
username: guest
password: guest
See more info in docs: https://docs.spring.io/spring-amqp/reference/amqp/template.html#publishing-is-async
The missed queue has nothing to do with transactions.
You have to enable publisherReturns
on the CachingConnectionFactory
and setReturnsCallback(ReturnsCallback returnCallback)
(or CorrelationData
) for your RabbitTemplate
.
Plus mandatory
has to be true
for message to publish.