I am trying to create a app using RabbitMQ and i am stuck at a point and I do not know what to do. I tried to follow a tutorial and everything should have worked, but it doesn't. Even if I am sending a postman request, even if I am doing the publishing from the RabbitMQ portal at localhost:15672, the pop up with message published appears, I get this text in the console, but when I want to check the queue, it is saying that is empty.
It is written very clear that the message has arrived to the consumer, but still, it the queue when I am checking from the portal, it doesn't
I will show the entire code now.
CONFIG
package ro.tuc.ds2020.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Value("${rabbitmq.queue.name}")
private String queue;
@Value("${rabbitmq.queue.json.name}")
private String jsonQueue;
@Value("${rabbitmq.queue.exchange}")
private String exchange;
@Value("${rabbitmq.queue.routing_key_one}")
private String routingKeyOne;
@Value("${rabbitmq.queue.routing_key_json}")
private String routingKeyJson;
@Bean
public Queue queue() {
return new Queue(queue);
}
@Bean
public Queue jsonQueue() {
return new Queue(jsonQueue, true);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange(exchange, true, false);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
}
@Bean
public Binding jsonBinding() {
return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
}
@Bean
public MessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
return rabbitTemplate;
}
}
CONSUMER
package ro.tuc.ds2020.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;
@Service
public class RabbitMQJsonConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);
@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
public void consumeJsonMessage(MeasurementDTO measurementDTO) {
LOGGER.info(String.format("Received JSON message here -> %s", measurementDTO.toString()));
}
}
CONTROLLER for when I am using postman
package ro.tuc.ds2020.controllers;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;
@RequestMapping("/api/v1")
@RestController
@CrossOrigin(origins = "http://localhost:4200", allowCredentials = "true")
public class MessageJsonController {
private RabbitMQJsonProducer jsonProducer;
public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
this.jsonProducer = rabbitMQJsonProducer;
}
@PostMapping("/publish")
public ResponseEntity<String> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
jsonProducer.sendJsonMessage(measurementDTO);
return ResponseEntity.ok("Json message sent to RabbitMQ ...");
}
}
PUBLISHER
package ro.tuc.ds2020.publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;
@Service
public class RabbitMQJsonProducer {
@Value("${rabbitmq.queue.exchange}")
private String exchange;
@Value("${rabbitmq.queue.routing_key_json}")
private String routingKeyJson;
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);
private RabbitTemplate rabbitTemplate;
@Autowired
public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendJsonMessage(MeasurementDTO measurementDTO) {
LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
}
}
And here is the application.properties
spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest
rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json
Java often presents more challenges in setting up a proper working environment compared to other languages like Node.js or Python, which are generally easier to configure.
Maven 3.9.9 and JDK 17
> mvn --version
Apache Maven 3.9.9 (8e8579a9e76f7d015ee5ec7bfcdc97d260186937)
Maven home: C:\Users\benchvue\maven\apache-maven-3.9.9
Java version: 17.0.12, vendor: Amazon.com Inc., runtime: C:\Program Files\Amazon Corretto\jdk17.0.12_7
Default locale: en_US, platform encoding: Cp1252
OS name: "windows 11", version: "10.0", arch: "amd64", family: "windows"
C:.
│ docker-compose.yml
│ pom.xml
│
├───.idea
│ .gitignore
│ compiler.xml
│ encodings.xml
│ jarRepositories.xml
│ misc.xml
│
└───src
└───main
├───java
│ └───ro
│ └───tuc
│ └───ds2020
│ │ Ds2020Application.java
│ │
│ ├───config
│ │ RabbitMQConfig.java
│ │
│ ├───consumer
│ │ RabbitMQJsonConsumer.java
│ │
│ ├───controllers
│ │ MessageJsonController.java
│ │
│ ├───dtos
│ │ MeasurementDTO.java
│ │
│ └───publisher
│ RabbitMQJsonProducer.java
│
└───resources
│ application.properties
│
└───static
RabbitMQConfig.java
package ro.tuc.ds2020.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Value("${rabbitmq.queue.name}")
private String queue;
@Value("${rabbitmq.queue.json.name}")
private String jsonQueue;
@Value("${rabbitmq.queue.exchange}")
private String exchange;
@Value("${rabbitmq.queue.routing_key_one}")
private String routingKeyOne;
@Value("${rabbitmq.queue.routing_key_json}")
private String routingKeyJson;
@Bean
public Queue queue() {
return new Queue(queue);
}
@Bean
public Queue jsonQueue() {
return new Queue(jsonQueue, true);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange(exchange, true, false);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
}
@Bean
public Binding jsonBinding() {
return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
}
@Bean
public MessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
return rabbitTemplate;
}
}
RabbitMQJsonConsumer.java
package ro.tuc.ds2020.consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;
@Service
public class RabbitMQJsonConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);
private final ObjectMapper objectMapper = new ObjectMapper();
@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
public void consumeJsonMessage(MeasurementDTO measurementDTO) {
try {
String jsonMessage = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(measurementDTO);
LOGGER.info("Received JSON message here -> \n{}", jsonMessage);
} catch (JsonProcessingException e) {
LOGGER.error("Failed to convert message to JSON", e);
}
}
}
MessageJsonController/java
package ro.tuc.ds2020.controllers;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;
import java.util.HashMap;
import java.util.Map;
@RequestMapping("/api/v1")
@RestController
public class MessageJsonController {
private final RabbitMQJsonProducer jsonProducer;
public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
this.jsonProducer = rabbitMQJsonProducer;
}
@PostMapping("/publish")
public ResponseEntity<Map<String, String>> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
jsonProducer.sendJsonMessage(measurementDTO);
// Create a JSON response body
Map<String, String> response = new HashMap<>();
response.put("message", "Json message sent to RabbitMQ");
response.put("status", "success");
return ResponseEntity.ok(response);
}
}
MeasurementDTO.java
package ro.tuc.ds2020.dtos;
import com.fasterxml.jackson.annotation.JsonProperty;
public class MeasurementDTO {
@JsonProperty("sensorId")
private String sensorId;
@JsonProperty("value")
private double value;
@JsonProperty("unit")
private String unit;
@JsonProperty("timestamp")
private String timestamp;
// Getters and Setters
public String getSensorId() {
return sensorId;
}
public void setSensorId(String sensorId) {
this.sensorId = sensorId;
}
public double getValue() {
return value;
}
public void setValue(double value) {
this.value = value;
}
public String getUnit() {
return unit;
}
public void setUnit(String unit) {
this.unit = unit;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "MeasurementDTO{" +
"sensorId='" + sensorId + '\'' +
", value=" + value +
", unit='" + unit + '\'' +
", timestamp='" + timestamp + '\'' +
'}';
}
}
RabbitMQJsonProducer.java
package ro.tuc.ds2020.publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;
@Service
public class RabbitMQJsonProducer {
@Value("${rabbitmq.queue.exchange}")
private String exchange;
@Value("${rabbitmq.queue.routing_key_json}")
private String routingKeyJson;
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);
private RabbitTemplate rabbitTemplate;
@Autowired
public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendJsonMessage(MeasurementDTO measurementDTO) {
LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
}
}
Ds2020Application.java
package ro.tuc.ds2020;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Ds2020Application {
public static void main(String[] args) {
SpringApplication.run(Ds2020Application.class, args);
}
}
application.properties
spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest
rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ro.tuc</groupId>
<artifactId>ds2020</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Starter AMQP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Jackson for JSON serialization -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Spring Boot Starter Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Spring Boot Maven Plugin -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*</include>
</includes>
</resource>
</resources>
</build>
</project>
docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672" # RabbitMQ messaging port
- "15672:15672" # RabbitMQ management UI
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
docker compose up
username: guest
password: guest
http://localhost:15672/#/
mvn clean install
dir target
java -jar target/ds2020-1.0.0.jar
POST http://localhost:8080/api/v1/publish
Input Body
{
"sensorId": "12345",
"value": 67.5,
"unit": "Celsius",
"timestamp": "2024-11-16T18:30:00Z"
}
Consumer will display in Spring Log
2024-11-16 19:11:43.964 INFO 22464 --- [ntContainer#0-1] r.t.d.consumer.RabbitMQJsonConsumer : Received JSON message here ->
{
"sensorId" : "12345",
"value" : 67.5,
"unit" : "Celsius",
"timestamp" : "2024-11-16T18:30:00Z"
}
You can see the Spike in Rabbit UI
If you want to see the queue message by RabbitMQ UI
you need to comment out RabbitMQJsonConsumer.java
From
@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
To
//@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
Then build jar and run it again