I have a class where I need to send a message to a NATS topic and receive a response and then process it. Here is my Java Spring app's service method:
public CreateRenewRsEvent getOrderCreationResponse(CreateRenewRqEvent createRenewRqEvent) {
try {
log.info("Sending order.create event: {}", objectMapper.writeValueAsString(createRenewRqEvent));
final Message response = natsConnection.request(
natsProperties.getSendTopic(),
objectMapper.writeValueAsBytes(createRenewRqEvent),
natsProperties.getTimeout()
);
log.info("Received order.create response: {}", new String(response.getData(), StandardCharsets.UTF_8));
response.ack();
return objectMapper.readValue(response.getData(), CreateRenewRsEvent.class);
} catch (Exception e) {
log.warn("Error sending order.create message", e);
}
return null;
}
I'm running this server locally in Docker, so there is another bean's method which should handle the message:
@PostConstruct
fun setUp() {
natsConnection.createDispatcher { handleMessage(it) }
.subscribe(natsProperties.sendTopic)
}
private fun handleMessage(msg: Message) {
val receivedMessage = String(msg.data, StandardCharsets.UTF_8)
val jsonNode = objectMapper.readTree(receivedMessage)
if (!jsonNode.has("response") || !jsonNode.has("err")) {
logger.info("Received order.create message: $receivedMessage")
val responseEvent = getOrderCreateResponse(receivedMessage)
natsConnection.publish(
natsProperties.sendTopic,
objectMapper.writeValueAsBytes(responseEvent)
)
logger.info("Sending order.create response: $responseEvent")
}
}
The problem is when I try to read the message in my first method, I get NPE - message is null as well as its body.
I tried using a dispatcher with nextMessage()
method, and it worked, but what I really need is using request()
method - it's an integration with an external system.
What am I doing wrong?
P.S. Some more facts:
order.create
. But after handling the message, there is another one - "_INBOX.rel468mZPCUYu9tUu48JRa.rel468mZPCUYu9tUu48JhC"
. It's a value of msg.replyTo()
field, but sending my response into that doesn't help either:(I was both dumb and blind.
All I needed is just to adjust using the response asynchronously, with CompletableFuture<Message>
. Everything else works