javamultithreadingapache-pulsarmessage-listener

Multithreading with pulsar message listeners


I'm fairly new to java message listeners and apache pulsar. Assume that I've maintained a listener like so,

private MessageListener<byte[]> generateListener() {
        MessageListener<byte[]> listener = (consumer, respMsg) -> {
            String respM = new String(respMsg.getValue(), StandardCharsets.UTF_8);
            logger.info(respM);
            consumer.acknowledgeAsync(respMsg);
        };
        return listener;
    }

And a Consumer instance like so,

Consumer<byte[]> c = consumerBuilder.messageListener(generateListener()).topic(topicName).subscriptionName("Consumer-" + i).subscribeAsync().get();

What I would like to know is how multiple incoming messages would be handled by this listener? Will each message be handled in a seperate thread as in the case of JMS listeners? If so, then how can I configure the number of threads to use - is it by using the ClientBuilder.listenerThreads() property?

Is there a need to maintain multiple listener objects respective to each consumer, when maintaining multiple consumers i.e, something like this -

consumerBuilder.clone().messageListener(generateListener()).topic(topicName).subscriptionName("Consumer-" + i).subscribeAsync() ?


Solution

  • The ClientBuilder#listenerThreads method allows configuring the size of an internal thread-pool that will be shared and used across all Consumers or Readers that will be created from that client.

    Pulsar Client will give you the guarantee that a MessageListener for a single consumer will always be invoked by the same thread, i.e that the provided MessageListener don't need to be thread-safe. So, YES it is prefered to use a dedicated MessageListener object per Consumer.

    Note that this also ensure ordering.

    So basically, if you only use a single Consumer then you can keep the listenerThreads to 1 (that is the default).

    Here is a complete example that can used to observe the behaviour :

    public class PulsarConsumerListenerExample {
    
        public static void main(String[] args) throws PulsarClientException {
    
            int numListenerThread = 2;
    
            PulsarClient client = PulsarClient
                    .builder()
                    .serviceUrl("pulsar://localhost:6650")
                    .listenerThreads(numListenerThread)
                    .build();
    
            final List<Consumer<?>> consumers = new ArrayList<>();
            for (int i = 0; i < numListenerThread; i++) {
                consumers.add(createConsumerWithLister(client, "my-topic", "my-subscription", "C" + i));
            }
    
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                for (Consumer<?> consumer : consumers) {
                    try {
                        consumer.close();
                    } catch (PulsarClientException e) {
                        e.printStackTrace();
                    }
                }
            }));
        }
    
        private static Consumer<String> createConsumerWithLister(final PulsarClient client,
                                                                 final String topic,
                                                                 final String subscription,
                                                                 final String consumerName) throws PulsarClientException {
            return client.newConsumer(Schema.STRING)
                .topic(topic)
                .consumerName(consumerName)
                .subscriptionName(subscription)
                .subscriptionMode(SubscriptionMode.Durable)
                .subscriptionType(SubscriptionType.Failover)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .messageListener((MessageListener<String>) (consumer, msg) -> {
                    System.out.printf(
                        "[%s/%s]Message received: key=%s, value=%s, topic=%s, id=%s%n",
                        consumerName,
                        Thread.currentThread().getName(),
                        msg.getKey(),
                        msg.getValue(),
                        msg.getTopicName(),
                        msg.getMessageId().toString());
                    consumer.acknowledgeAsync(msg);
                })
                .subscribe();
        }
    }