springspring-bootrabbitmqamqpspring-amqp

Spring RabbitMQ - RPC - CorrelationId not matching - TopicExchange - Client- Server - model


I have Spring boot - RabbitMQ app. The exchange used is topicexchange.

public static final String RPC_REQ_QUEUE = "req.queue";
public static final String RPC_RES_QUEUE = "res.queue";
public static final String RPC_EXCHANGE = "rpc_exchange";


@Bean
public Queue reqQueue() { return new Queue(RPC_REQ_QUEUE);  }
@Bean
public Queue resQueue() { return new Queue(RPC_RES_QUEUE);  }

@Bean
public TopicExchange exchange() {return new TopicExchange(RPC_EXCHANGE); }

@Bean
public Binding requestBinding(TopicExchange exchange,  Queue reqQueue) {
    return BindingBuilder.bind(reqQueue).to(exchange)
            .with(RPC_REQ_QUEUE);
}
@Bean
public Binding responseBinding(TopicExchange exchange, Queue resQueue) {
    return BindingBuilder.bind(resQueue).to(exchange)
            .with(RPC_RES_QUEUE);
}

Client and Server @Profile("client") @Component

public class MyConsumerRequestor {
.....

public void send() {
        System.out.println(" [x] Requesting fib(" + start + ")");
        Integer response = (Integer) template.convertSendAndReceive
                (exchange.getName(), RPC_REQ_QUEUE,
                        String.valueOf(start++),
                        message -> {
                            message.getMessageProperties().setReplyTo(RPC_RES_QUEUE);
                            message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
                            return message;
                        });

        System.out.println(" [.] Got '" + response + "'");
    }

The server

@Profile("server")
@Component
public class MyProducerBackend {
 @RabbitListener(queues = RPC_REQ_QUEUE, concurrency = "2")

    public void fibonacci(Message message) {

        String body = new String(message.getBody(), StandardCharsets.UTF_8);
       
        int n = Integer.parseInt(body);
        
        int result = fib(n);
        System.out.println(" [.] Returning " + result);
        // Send response to RPC_RES_QUEUE
        rabbitTemplate.convertAndSend(exchange.getName(), RPC_RES_QUEUE, String.valueOf(result),
                msg -> {
                        msg.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());
                        return msg;
                    });
    } 

Observations:

The client is sending messages, and server is receiving and returning. But the correlation Ids on server are like 1, 2 , 3 ...and not the ones sent from client side.

The application is started with client and server profiles.

The RabbitMq example Rabbit uses Direct exchange and can't demonstrate correlation id and concurrency /async.

Issue:

[x] Requesting fib(1)

[.] Got 'null'

why is correlation id mismatch ? Do I need to split the beans in the config for client and server ?

Note: I am using a fixed reply queue.

Edit:

Source code : source code git


Solution

  • You showed this in your question:

    @Bean
    public Binding responseBinding(TopicExchange exchange, Queue resQueue) {
        return BindingBuilder.bind(resQueue).to(exchange)
                .with(RPC_RES_QUEUE);
    }
    

    But that part is missed from your code. When I added this into your Config:

    @Bean
    public Binding responseBinding(TopicExchange rpcExchange, Queue replyQueue) {
        return BindingBuilder.bind(replyQueue).to(rpcExchange).with(RPC_REPLY_QUEUE);
    }
    

    It has started working.

    The discrepancy with correlationId that it is changed internally in the RabbitTemplate for its own logic.

    Technically you must not worry about that one at all. See how @RabbitListener can handle replies: https://docs.spring.io/spring-amqp/reference/amqp/receiving-messages/async-annotation-driven/reply.html

    So, this is a change you should do on the server side:

    @RabbitListener(queues = RPC_REQUEST_QUEUE, concurrency = "2")
    @SendTo(RPC_EXCHANGE + '/' + RPC_REPLY_QUEUE)
    public int processRequest(Message message) {
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        int number = Integer.parseInt(body);
    
        System.out.println(message);
    
        System.out.println("[Server] Received request: " + number);
    
        int result = fibonacci(number);
        System.out.println("[Server] Computed result: " + result);
    
        return result;
    }
    

    And this is on the client:

    public int sendRpcRequest() {
        Integer response = (Integer) rabbitTemplate.convertSendAndReceive(
                RPC_EXCHANGE,
                "rpc.request.key",
                String.valueOf(number++));
    
        if (response != null) {
            System.out.println("[Client] Received response: " + response);
        } else {
            System.err.println("[Client] Response is null!");
        }
        return response == null ? -1 : response;
    }
    

    You see, no any correlation headache and @RabbitListener is as simple as possible.