I'm fairly new to java message listeners
and apache pulsar
.
Assume that I've maintained a listener like so,
private MessageListener<byte[]> generateListener() {
MessageListener<byte[]> listener = (consumer, respMsg) -> {
String respM = new String(respMsg.getValue(), StandardCharsets.UTF_8);
logger.info(respM);
consumer.acknowledgeAsync(respMsg);
};
return listener;
}
And a Consumer instance like so,
Consumer<byte[]> c = consumerBuilder.messageListener(generateListener()).topic(topicName).subscriptionName("Consumer-" + i).subscribeAsync().get();
What I would like to know is how multiple incoming messages would be handled by this listener? Will each message be handled in a seperate thread as in the case of JMS listeners? If so, then how can I configure the number of threads to use - is it by using the ClientBuilder.listenerThreads()
property?
Is there a need to maintain multiple listener objects respective to each consumer, when maintaining multiple consumers i.e, something like this -
consumerBuilder.clone().messageListener(generateListener()).topic(topicName).subscriptionName("Consumer-" + i).subscribeAsync()
?
The ClientBuilder#listenerThreads
method allows configuring the size of an internal thread-pool that will be shared and used across all Consumers
or Readers
that will be created from that client.
Pulsar Client will give you the guarantee that a MessageListener
for a single consumer will always be invoked by the same thread, i.e that the provided MessageListener
don't need to be thread-safe. So, YES it is prefered to use a dedicated MessageListener
object per Consumer
.
Note that this also ensure ordering.
So basically, if you only use a single Consumer
then you can keep the listenerThreads to 1
(that is the default).
Here is a complete example that can used to observe the behaviour :
public class PulsarConsumerListenerExample {
public static void main(String[] args) throws PulsarClientException {
int numListenerThread = 2;
PulsarClient client = PulsarClient
.builder()
.serviceUrl("pulsar://localhost:6650")
.listenerThreads(numListenerThread)
.build();
final List<Consumer<?>> consumers = new ArrayList<>();
for (int i = 0; i < numListenerThread; i++) {
consumers.add(createConsumerWithLister(client, "my-topic", "my-subscription", "C" + i));
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
for (Consumer<?> consumer : consumers) {
try {
consumer.close();
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}));
}
private static Consumer<String> createConsumerWithLister(final PulsarClient client,
final String topic,
final String subscription,
final String consumerName) throws PulsarClientException {
return client.newConsumer(Schema.STRING)
.topic(topic)
.consumerName(consumerName)
.subscriptionName(subscription)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Failover)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener((MessageListener<String>) (consumer, msg) -> {
System.out.printf(
"[%s/%s]Message received: key=%s, value=%s, topic=%s, id=%s%n",
consumerName,
Thread.currentThread().getName(),
msg.getKey(),
msg.getValue(),
msg.getTopicName(),
msg.getMessageId().toString());
consumer.acknowledgeAsync(msg);
})
.subscribe();
}
}