I started working with messaging queues using RabbitMQ, I am fine with using AMQP below is the code, so I discovered that when going with RabbitTemplate and amqp.connectionFactory, we cannot have have Channel Bean from the same org.springframework.amqp.rabbit.connection.ConnectionFactory.
but I want to use the Channel object to set manual ACK using basiAck,
@RabbitListener(queues = "email_queue", concurrency = "4", ackMode = "Manual")
@Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(1000))
public void sendEmail(String customerEmail, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
try {
SimpleMailMessage simpleMailMessage = new SimpleMailMessage();
simpleMailMessage.setFrom("kartikpawade25@gmail.com");
simpleMailMessage.setTo(customerEmail);
simpleMailMessage.setText("Hi, your ticket has been booked Successfully.");
simpleMailMessage.setSubject("Movie Tickets Booked");
javaMailSender.send(simpleMailMessage);
channel.basicAck(deliveryTag, false);
}catch (Exception e){
log.error("Email Consumer Service::" + e.getMessage());
throw e;
}
}
The problem is, I have not defined Channel Object/Bean in rabbit-config, How will I be getting it from rabbitTemplate, so I can pass it as parameter in above method. below is my RabbitConfig.
@Configuration
public class RabbitConfig {
@Value("${rabbitmq.email.queue}")
private String emailQueueName;
@Value("${rabbitmq.email.exchange.name}")
private String emailExchangeName;
@Value("${rabbitmq.email.binding.key}")
private String emailBindingKey;
@Bean
public Queue emailQueue() {
return new Queue(emailQueueName);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(emailExchangeName);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(emailQueue()).to(topicExchange()).with(emailBindingKey);
}
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
should I use the rabbitmq.client library to define connectionFactory -> new connection -> channel -> build queues.... and then use the Channel as @Autowired property? Do I need to switch from amqp library to rabbitMq library fully?
Any complete code/github example is much appritiated,Thanks.
No you do not need to do that.
The @RabbitListener
is not a bean, but rather a handler method which is called by the framework when message has arrived to the listener container.
The arguments for those params in the method signature are set by the MessagingMessageListenerAdapter
:
return this.handlerAdapter.invoke(message, amqpMessage, channel, amqpMessage.getMessageProperties());
That adapter does other smart job trying to resolve some other arguments if you use @Header
as you do in your case.
See more info in docs: https://docs.spring.io/spring-amqp/reference/amqp/receiving-messages/async-annotation-driven.html