spring-amqp

Transactional publishing not throwing exception when queue does not exist


I want to use Spring amqp to synchronously send and wait for messages to arrive in a RabbitMQ queue. If the message does not arrive in the queue for any reason (ex. queue does not exist) an exception should occur.

The design behind this is that we have 2 applications, application A that receives job requests from a UI and sends them to application B. A saves records of those jobs in its DB so the user can review them. Application B is usually turned off and occasionally turned on by the user to consume jobs from the queue. It's important that the jobs make it to the queue; we would like to fail-fast and immediately tell the user the job was not enqueued.

I thought this was possible using RabbitTemplate.channelTransacted(true) and @Transactional on my publishing code. I tested it out by publishing to a queue that does not exist on the broker. Unfortunately the code proceeds without any exceptions.

Below is the very simplified code for Application A. It's setup to create the queue for Application B in case B hasn't been started yet. This queue-creation is commented out to test the "required delivery" behavior.

Am I missing some configuration? Am I misunderstanding how Spring amqp transactions work? Or should this be working? (If the latter, then I'll make a small github project to reproduce the issue).

package app;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class RequestPublisher {

  private final RabbitTemplate rabbitTemplate;

  public RequestPublisher(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
  }

  /* Invoked by a @Controller */
  @Transactional
  public void request(byte[] messageBody) {
    rabbitTemplate.send("requests", new Message(messageBody));
    System.out.println("Additional logic that should not be performed unless message reached the queue");
  }

}
package config;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {


  @Bean
  public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
    return new RabbitAdmin(connectionFactory);
  }

  @Bean
  public RabbitTemplate transactionalRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setChannelTransacted(true);
    return rabbitTemplate;
  }

  /*
    // Normally commented-in, but commented-out to test transactions
    @Bean
    public Queue requestQueue() {
      return new Queue("requests");
    }
  */

}
package app;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@SpringBootApplication
@EnableTransactionManagement
public class Application {

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

}

application.yml:

spring:
  rabbitmq:
    host: rabbitmq
    username: guest
    password: guest

Solution

  • See more info in docs: https://docs.spring.io/spring-amqp/reference/amqp/template.html#publishing-is-async

    The missed queue has nothing to do with transactions.

    You have to enable publisherReturns on the CachingConnectionFactory and setReturnsCallback(ReturnsCallback returnCallback) (or CorrelationData) for your RabbitTemplate. Plus mandatory has to be true for message to publish.