springspring-bootrabbitmqspring-rabbit

RabbitMQ queue is empty even if message was published


I am trying to create a app using RabbitMQ and i am stuck at a point and I do not know what to do. I tried to follow a tutorial and everything should have worked, but it doesn't. Even if I am sending a postman request, even if I am doing the publishing from the RabbitMQ portal at localhost:15672, the pop up with message published appears, I get this text in the console, but when I want to check the queue, it is saying that is empty.

enter image description here

It is written very clear that the message has arrived to the consumer, but still, it the queue when I am checking from the portal, it doesn't

I will show the entire code now.

CONFIG

package ro.tuc.ds2020.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_one}")
    private String routingKeyOne;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    @Bean
    public Queue queue() {
        return new Queue(queue);
    }

    @Bean
    public Queue jsonQueue() {
        return new Queue(jsonQueue, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
    }

    @Bean
    public Binding jsonBinding() {
        return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
    }

    @Bean
    public MessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }
}

CONSUMER

package ro.tuc.ds2020.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Received JSON message here -> %s", measurementDTO.toString()));
    }

}

CONTROLLER for when I am using postman

package ro.tuc.ds2020.controllers;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;

@RequestMapping("/api/v1")
@RestController
@CrossOrigin(origins = "http://localhost:4200", allowCredentials = "true")
public class MessageJsonController {

    private RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
        this.jsonProducer = rabbitMQJsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
        jsonProducer.sendJsonMessage(measurementDTO);
        return  ResponseEntity.ok("Json message sent to RabbitMQ ...");
    }
}

PUBLISHER

package ro.tuc.ds2020.publisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
        rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
    }

}

And here is the application.properties

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json

Solution

  • Java often presents more challenges in setting up a proper working environment compared to other languages like Node.js or Python, which are generally easier to configure.

    Requirement

    Maven 3.9.9 and JDK 17

    > mvn --version
    Apache Maven 3.9.9 (8e8579a9e76f7d015ee5ec7bfcdc97d260186937)
    Maven home: C:\Users\benchvue\maven\apache-maven-3.9.9
    Java version: 17.0.12, vendor: Amazon.com Inc., runtime: C:\Program Files\Amazon Corretto\jdk17.0.12_7
    Default locale: en_US, platform encoding: Cp1252
    OS name: "windows 11", version: "10.0", arch: "amd64", family: "windows"
    

    enter image description here

    File Tree

    C:.
    │   docker-compose.yml
    │   pom.xml
    │
    ├───.idea
    │       .gitignore
    │       compiler.xml
    │       encodings.xml
    │       jarRepositories.xml
    │       misc.xml
    │
    └───src
        └───main
            ├───java
            │   └───ro
            │       └───tuc
            │           └───ds2020
            │               │   Ds2020Application.java
            │               │
            │               ├───config
            │               │       RabbitMQConfig.java
            │               │
            │               ├───consumer
            │               │       RabbitMQJsonConsumer.java
            │               │
            │               ├───controllers
            │               │       MessageJsonController.java
            │               │
            │               ├───dtos
            │               │       MeasurementDTO.java
            │               │
            │               └───publisher
            │                       RabbitMQJsonProducer.java
            │
            └───resources
                │   application.properties
                │
                └───static
    

    enter image description here

    RabbitMQConfig.java

    package ro.tuc.ds2020.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
    
        @Value("${rabbitmq.queue.name}")
        private String queue;
    
        @Value("${rabbitmq.queue.json.name}")
        private String jsonQueue;
    
        @Value("${rabbitmq.queue.exchange}")
        private String exchange;
    
        @Value("${rabbitmq.queue.routing_key_one}")
        private String routingKeyOne;
    
        @Value("${rabbitmq.queue.routing_key_json}")
        private String routingKeyJson;
    
        @Bean
        public Queue queue() {
            return new Queue(queue);
        }
    
        @Bean
        public Queue jsonQueue() {
            return new Queue(jsonQueue, true);
        }
    
        @Bean
        public TopicExchange exchange() {
            return new TopicExchange(exchange, true, false);
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
        }
    
        @Bean
        public Binding jsonBinding() {
            return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
        }
    
        @Bean
        public MessageConverter converter() {
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean
        public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(converter());
            return rabbitTemplate;
        }
    }
    

    RabbitMQJsonConsumer.java

    package ro.tuc.ds2020.consumer;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    import ro.tuc.ds2020.dtos.MeasurementDTO;
    
    @Service
    public class RabbitMQJsonConsumer {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);
        private final ObjectMapper objectMapper = new ObjectMapper();
    
        @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
        public void consumeJsonMessage(MeasurementDTO measurementDTO) {
            try {
                String jsonMessage = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(measurementDTO);
                LOGGER.info("Received JSON message here -> \n{}", jsonMessage);
            } catch (JsonProcessingException e) {
                LOGGER.error("Failed to convert message to JSON", e);
            }
        }
    }
    

    MessageJsonController/java

    package ro.tuc.ds2020.controllers;
    
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.*;
    import ro.tuc.ds2020.dtos.MeasurementDTO;
    import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @RequestMapping("/api/v1")
    @RestController
    public class MessageJsonController {
    
        private final RabbitMQJsonProducer jsonProducer;
    
        public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
            this.jsonProducer = rabbitMQJsonProducer;
        }
    
        @PostMapping("/publish")
        public ResponseEntity<Map<String, String>> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
            jsonProducer.sendJsonMessage(measurementDTO);
    
            // Create a JSON response body
            Map<String, String> response = new HashMap<>();
            response.put("message", "Json message sent to RabbitMQ");
            response.put("status", "success");
    
            return ResponseEntity.ok(response);
        }
    }
    

    MeasurementDTO.java

    package ro.tuc.ds2020.dtos;
    
    import com.fasterxml.jackson.annotation.JsonProperty;
    
    public class MeasurementDTO {
    
        @JsonProperty("sensorId")
        private String sensorId;
    
        @JsonProperty("value")
        private double value;
    
        @JsonProperty("unit")
        private String unit;
    
        @JsonProperty("timestamp")
        private String timestamp;
    
        // Getters and Setters
        public String getSensorId() {
            return sensorId;
        }
    
        public void setSensorId(String sensorId) {
            this.sensorId = sensorId;
        }
    
        public double getValue() {
            return value;
        }
    
        public void setValue(double value) {
            this.value = value;
        }
    
        public String getUnit() {
            return unit;
        }
    
        public void setUnit(String unit) {
            this.unit = unit;
        }
    
        public String getTimestamp() {
            return timestamp;
        }
    
        public void setTimestamp(String timestamp) {
            this.timestamp = timestamp;
        }
    
        @Override
        public String toString() {
            return "MeasurementDTO{" +
                    "sensorId='" + sensorId + '\'' +
                    ", value=" + value +
                    ", unit='" + unit + '\'' +
                    ", timestamp='" + timestamp + '\'' +
                    '}';
        }
    }
    

    RabbitMQJsonProducer.java

    package ro.tuc.ds2020.publisher;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    import ro.tuc.ds2020.dtos.MeasurementDTO;
    
    @Service
    public class RabbitMQJsonProducer {
    
        @Value("${rabbitmq.queue.exchange}")
        private String exchange;
    
        @Value("${rabbitmq.queue.routing_key_json}")
        private String routingKeyJson;
    
        private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);
    
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
    
        public void sendJsonMessage(MeasurementDTO measurementDTO) {
            LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
            rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
        }
    }
    

    Ds2020Application.java

    package ro.tuc.ds2020;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class Ds2020Application {
        public static void main(String[] args) {
            SpringApplication.run(Ds2020Application.class, args);
        }
    }
    

    application.properties

    spring.rabbitmq.host = localhost
    spring.rabbitmq.port = 5672
    spring.rabbitmq.username = guest
    spring.rabbitmq.password = guest
    
    rabbitmq.queue.name = queue_1
    rabbitmq.queue.json.name = queue_json
    rabbitmq.queue.exchange = exchange
    rabbitmq.queue.routing_key_one = routing_key_1
    rabbitmq.queue.routing_key_json = routing_key_json
    

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>ro.tuc</groupId>
        <artifactId>ds2020</artifactId>
        <version>1.0.0</version>
        <packaging>jar</packaging>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.7.5</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <dependencies>
            <!-- Spring Boot Starter Web -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <!-- Spring Boot Starter AMQP -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
            <!-- Jackson for JSON serialization -->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </dependency>
    
            <!-- Spring Boot Starter Test -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <!-- Spring Boot Maven Plugin -->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
            <resources>
                <resource>
                    <directory>src/main/resources</directory>
                    <includes>
                        <include>**/*</include>
                    </includes>
                </resource>
            </resources>        
        </build>
    </project>
    

    docker-compose.yml

    version: '3.8'
    
    services:
      rabbitmq:
        image: rabbitmq:3-management
        container_name: rabbitmq
        ports:
          - "5672:5672" # RabbitMQ messaging port
          - "15672:15672" # RabbitMQ management UI
        environment:
          RABBITMQ_DEFAULT_USER: guest
          RABBITMQ_DEFAULT_PASS: guest
    

    Launching RabbitMQ

    docker compose up
    

    enter image description here

    Access RabbitMQ UI

    username: guest
    password: guest
    
    http://localhost:15672/#/
    

    enter image description here

    Compile jar

    mvn clean install
    

    enter image description here

    dir target
    

    enter image description here

    launching Java project

    java -jar target/ds2020-1.0.0.jar
    

    enter image description here

    Call REST API by Postman

    POST http://localhost:8080/api/v1/publish
    

    Input Body

    {
        "sensorId": "12345",
        "value": 67.5,
        "unit": "Celsius",
        "timestamp": "2024-11-16T18:30:00Z"
    }
    

    enter image description here

    Java Side enter image description here

    Consumer will display in Spring Log

    2024-11-16 19:11:43.964  INFO 22464 --- [ntContainer#0-1] r.t.d.consumer.RabbitMQJsonConsumer      : Received JSON message here ->
    {
      "sensorId" : "12345",
      "value" : 67.5,
      "unit" : "Celsius",
      "timestamp" : "2024-11-16T18:30:00Z"
    }
    

    You can see the Spike in Rabbit UI enter image description here

    If you want to see the queue message by RabbitMQ UI

    you need to comment out RabbitMQJsonConsumer.java

    From

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    

    To

    //@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    

    Then build jar and run it again

    enter image description here

    Good Luck!