I deployed a Kubernetes cluster using the RabbitMQ Operator and activated the rabbitmq_stream
plugin. This is my yaml:
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: rabbitmq-deployment
namespace: rabbitmq-namespace
spec:
replicas: 2
image: rabbitmq:3.11.13
persistence:
storage: 20Gi
service:
type: LoadBalancer
rabbitmq:
additionalPlugins:
- rabbitmq_stream
- rabbitmq_stream_management
Also i use the RabbitMQ Java Stream Client and i'm connecting to the cluster like this:
EnvironmentBuilder environmentBuilder = Environment.builder();
environmentBuilder.host(System.getenv("RABBITMQ_HOST"));
environmentBuilder.port(Integer.parseInt(System.getenv("RABBITMQ_STREAM_PORT")));
environmentBuilder.username(System.getenv("RABBITMQ_USERNAME"));
environmentBuilder.password(System.getenv("RABBITMQ_PASSWORD"));
mainConnection = environmentBuilder.build();
Now when i use this client so create the stream, it's working flawlessy and no error is reported:
mainConnection.streamCreator().stream("mystream").maxAge(Duration.of(1, ChronoUnit.DAYS)).create()
Now when i try to produce messages like this:
Producer producer = RabbitMQStreamConnection.mainConnection.producerBuilder().stream("mystream").build();
byte[] messagePayload = "hello".getBytes(StandardCharsets.UTF_8);
producer.send(
producer.messageBuilder().addData(messagePayload).build(),
confirmationStatus -> {
if (confirmationStatus.isConfirmed()) {
// the message made it to the broker
} else {
// the message did not make it to the broker
}
});
It throws this exception:
com.rabbitmq.stream.StreamException
Error while creating stream connection to rabbitmq-deployment-server-0.rabbitmq-deployment-nodes.rabbitmq-namespace:5552
Of course, because there are two nodes (replicas = 2
) and it seems like traffic gets redirected directly.
What i want is that i can produce & consume messages from the stream.
Right now, i have no clue what i could do next to solve this problem.
You should use the load balancer configuration.
A load balancer can misguide the client when it tries to connect to nodes that host stream leaders and replicas. The "Connecting to Streams" blog post covers why client applications must connect to the appropriate nodes in a cluster and how a load balancer can make things complicated for them.
The EnvironmentBuilder#addressResolver(AddressResolver) method allows intercepting the node resolution after metadata hints and before connection. Applications can use this hook to ignore metadata hints and always use the load balancer, as illustrated in the following snippet:
Using a custom address resolver to always use a load balancer
Address entryPoint = new Address("my-load-balancer", 5552);
Environment environment = Environment.builder()
.host(entryPoint.host())
.port(entryPoint.port())
.addressResolver(address -> entryPoint)
.build();