I hope you are well! First, I am new to the EIP world. I am trying to do a simple request reply with:
I have tried to read all the docs I could and search for answers but I could't find nothing. I am basically desperate. Mainly I saw this and nothing has worked yet.
My goal is to do a sync request-reply as the image.
My Golang client looks like this:
func (r *RabbitMQConn) GetQueue(name string) *amqp.Queue {
ch := r.GetChannel()
defer ch.Close()
q, err := ch.QueueDeclare(
name,
false,
false,
true,
false,
nil,
)
if err != nil {
panic(err)
}
return &q
}
func (r *RabbitMQConn) PublishAndWait(routingKey string, correlationId string, event domain.SyncEventExtSend) (domain.SyncEventExtReceive, error) {
message, err := json.Marshal(event)
if err != nil {
return domain.SyncEventExtReceive{}, apperrors.ErrInternal
}
ch := r.GetChannel()
defer ch.Close()
q := r.GetQueue("response")
h, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
return domain.SyncEventExtReceive{}, err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = ch.PublishWithContext(
ctx,
"",
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: message,
CorrelationId: correlationId,
ReplyTo: q.Name,
},
)
if err != nil {
return domain.SyncEventExtReceive{}, err
}
for d := range h {
fmt.Println("Received a message:", string(d.Body))
if d.CorrelationId == correlationId {
var event domain.SyncEventExtReceive
err = json.Unmarshal(d.Body, &event)
return event, err
}
}
return domain.SyncEventExtReceive{}, apperrors.ErrInternal
}
Basically, just consuming from the default exchange with a named response queue. Also, I send the queue name as the ReplyTo parameter and I give it a correlation id. The routing-key that is sent is daily-weather
in this case.
On the server side, I tried to do the server with the default exchange, but Apache Camel forbids me to do nothing with that exchange.
from("rabbitmq:?queue=daily-weather&autoAck=true&autoDelete=false")
So, I assigned it the amq.direct
exchange. However, that didn't also worked.
"rabbitmq:amq.direct?queue=daily-weather&autoAck=true&autoDelete=false"
Then, I added a second RabbitMQ endpoint to see if it would sent it, but nothing.
from("rabbitmq:amq.direct?queue=daily-weather&autoAck=true&autoDelete=false")
.log(LoggingLevel.INFO, "weather-daily", "Received message: \${body}")
.to("rabbitmq:amq.direct?queue=response&autoAck=true&autoDelete=false")
I ask if anybody has any simple example to do this with Apache Camel, because I am ultra lost. Any further detail can be shared if you contact me.
Thank you very much!!!! :)
SOLVED
Hi! After some time I decided to take a look to the spring-rabbitmq Camel component. I realised that Camel has exchange patterns, and rabbitmq, by default, sets it to inOut
. This way, automatically returns the information back to the replyTo
property.
val RABBIMQ_ROUTE =
"spring-rabbitmq:default?queues={{rabbitmq.weather.daily.routing_key}}"
default
refers to the default exchange queue.