javaazureazureservicebusazure-servicebus-topics

Can we able to send acknowledgement programmatically when subscription is in receive and delete mode - JMS 1.0


I am trying to send acknowledgement programmatically(Java 17) using JMS version v1.0 in azure service bus with spring boot 2.5.0 with topic subscription is in RECEIVE AND DELETE mode. I get the below error for the above scenario.

"Failed to connect after: 10 attempt(s) continuing to retry."

The code I tried is below.

ConnectionFactory factory   = new ServiceBusJmsConnectionFactory(<CONNECTION_STRING>, new ServiceBusJmsConnectionFactory());
Connection connection       = factory.createConnection();
connection.setClientId(<CLIENT_ID>);
connection.setExceptionListener(new MyExceptionHandler());
connection.start();
Session session = connection.createSession(true, session.CLIENT_ACKNOWLEDGE);

Please Help.


Solution

  • I tried the below Spring Boot application to send an acknowledgment to an Azure Service Bus topic subscription configured in RECEIVE_AND_DELETE mode with JMS.

    Code :

    ServiceBusService.java :

    import com.microsoft.azure.servicebus.jms.ServiceBusJmsConnectionFactory;
    import com.microsoft.azure.servicebus.jms.ServiceBusJmsConnectionFactorySettings;
    import javax.jms.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ServiceBusService implements CommandLineRunner {
    
        private static final Logger logger = LoggerFactory.getLogger(ServiceBusService.class);
        @Value("${azure.servicebus.connection-string}")
        private String connectionString;
        @Value("${azure.servicebus.topic-name}")
        private String topicName;
        @Value("${azure.servicebus.subscription-name}")
        private String subscriptionName;
    
        private Connection connection;
        private Session session;
        private MessageConsumer consumer;
    
        @Override
        public void run(String... args) throws Exception {
            initializeConnection();
            sendMessage("Hello, Kamali!");
            receiveMessages();
            closeResources();
        }
    
        private void initializeConnection() throws JMSException {
            ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory(connectionString, new ServiceBusJmsConnectionFactorySettings());
            connection = factory.createConnection();
            connection.setClientID("<clientID>");
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            connection.start();
        }
    
        public void sendMessage(String messageContent) {
            try {
                Destination destination = session.createTopic(topicName);
                MessageProducer producer = session.createProducer(destination);
                TextMessage message = session.createTextMessage(messageContent);
                producer.send(message);
                logger.info("Sent a single message to the topic: {}", topicName);
                producer.close();
            } catch (JMSException e) {
                logger.error("Error sending message: {}", e.getMessage());
            }
        }
    
        public void receiveMessages() {
            try {
                Destination destination = session.createTopic(topicName);
                consumer = session.createDurableSubscriber((Topic) destination, subscriptionName);
                consumer.setMessageListener(message -> {
                    try {
                        if (message instanceof TextMessage) {
                            String messageBody = ((TextMessage) message).getText();
                            logger.info("Received: {}", messageBody);
                            message.acknowledge();
                        }
                    } catch (JMSException e) {
                        logger.error("Error processing message: {}", e.getMessage());
                    }
                });
                logger.info("Waiting for messages... Press enter to stop.");
                System.in.read();
            } catch (Exception e) {
                logger.error("Error receiving messages: {}", e.getMessage());
            }
        }
        private void closeResources() {
            try {
                if (consumer != null) consumer.close();
                if (session != null) session.close();
                if (connection != null) connection.close();
            } catch (JMSException e) {
                logger.error("Error closing resources: {}", e.getMessage());
            }
        }
    }
    

    application.properties :

    azure.servicebus.connection-string=<connecString>
    azure.servicebus.topic-name=<TopicName>
    azure.servicebus.subscription-name=<SubName>
    logging.level.org.apache.qpid.jms=DEBUG
    

    pom.xml :

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.microsoft.azure</groupId>
            <artifactId>azure-servicebus-jms</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    

    Output :

    The following Spring Boot application ran successfully and sent message acknowledgement to the Azure Service Bus Topic.

    enter image description here