I have Spring boot - RabbitMQ app. The exchange used is topicexchange.
public static final String RPC_REQ_QUEUE = "req.queue";
public static final String RPC_RES_QUEUE = "res.queue";
public static final String RPC_EXCHANGE = "rpc_exchange";
@Bean
public Queue reqQueue() { return new Queue(RPC_REQ_QUEUE); }
@Bean
public Queue resQueue() { return new Queue(RPC_RES_QUEUE); }
@Bean
public TopicExchange exchange() {return new TopicExchange(RPC_EXCHANGE); }
@Bean
public Binding requestBinding(TopicExchange exchange, Queue reqQueue) {
return BindingBuilder.bind(reqQueue).to(exchange)
.with(RPC_REQ_QUEUE);
}
@Bean
public Binding responseBinding(TopicExchange exchange, Queue resQueue) {
return BindingBuilder.bind(resQueue).to(exchange)
.with(RPC_RES_QUEUE);
}
Client and Server @Profile("client") @Component
public class MyConsumerRequestor {
.....
public void send() {
System.out.println(" [x] Requesting fib(" + start + ")");
Integer response = (Integer) template.convertSendAndReceive
(exchange.getName(), RPC_REQ_QUEUE,
String.valueOf(start++),
message -> {
message.getMessageProperties().setReplyTo(RPC_RES_QUEUE);
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
});
System.out.println(" [.] Got '" + response + "'");
}
The server
@Profile("server")
@Component
public class MyProducerBackend {
@RabbitListener(queues = RPC_REQ_QUEUE, concurrency = "2")
public void fibonacci(Message message) {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
int n = Integer.parseInt(body);
int result = fib(n);
System.out.println(" [.] Returning " + result);
// Send response to RPC_RES_QUEUE
rabbitTemplate.convertAndSend(exchange.getName(), RPC_RES_QUEUE, String.valueOf(result),
msg -> {
msg.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());
return msg;
});
}
Observations:
The client is sending messages, and server is receiving and returning. But the correlation Ids on server are like 1, 2 , 3 ...and not the ones sent from client side.
The application is started with client and server profiles.
The RabbitMq example Rabbit uses Direct exchange and can't demonstrate correlation id and concurrency /async.
Issue:
[x] Requesting fib(1)
[.] Got 'null'
why is correlation id mismatch ? Do I need to split the beans in the config for client and server ?
Note: I am using a fixed reply queue.
Edit:
Source code : source code git
You showed this in your question:
@Bean
public Binding responseBinding(TopicExchange exchange, Queue resQueue) {
return BindingBuilder.bind(resQueue).to(exchange)
.with(RPC_RES_QUEUE);
}
But that part is missed from your code.
When I added this into your Config
:
@Bean
public Binding responseBinding(TopicExchange rpcExchange, Queue replyQueue) {
return BindingBuilder.bind(replyQueue).to(rpcExchange).with(RPC_REPLY_QUEUE);
}
It has started working.
The discrepancy with correlationId
that it is changed internally in the RabbitTemplate
for its own logic.
Technically you must not worry about that one at all. See how @RabbitListener
can handle replies: https://docs.spring.io/spring-amqp/reference/amqp/receiving-messages/async-annotation-driven/reply.html
So, this is a change you should do on the server side:
@RabbitListener(queues = RPC_REQUEST_QUEUE, concurrency = "2")
@SendTo(RPC_EXCHANGE + '/' + RPC_REPLY_QUEUE)
public int processRequest(Message message) {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
int number = Integer.parseInt(body);
System.out.println(message);
System.out.println("[Server] Received request: " + number);
int result = fibonacci(number);
System.out.println("[Server] Computed result: " + result);
return result;
}
And this is on the client:
public int sendRpcRequest() {
Integer response = (Integer) rabbitTemplate.convertSendAndReceive(
RPC_EXCHANGE,
"rpc.request.key",
String.valueOf(number++));
if (response != null) {
System.out.println("[Client] Received response: " + response);
} else {
System.err.println("[Client] Response is null!");
}
return response == null ? -1 : response;
}
You see, no any correlation headache and @RabbitListener
is as simple as possible.