javajmsactivemq-artemis

Jakarta JMS Spring-boot sending messages immediately


I need send some messages without waiting transaction ending. But all settings I tried like template.setSessionTransacted(false); and setting different acknowledge modes didn't bring any results.

There is jms libraries I use. Now it use all settings default but I tried use all aknowledge mods enable end disable transaction mods

    implementation 'org.apache.activemq:artemis-jakarta-client:2.38.0'
    implementation 'org.apache.activemq:artemis-core-client:2.38.0'
    implementation 'org.apache.activemq:artemis-commons:2.38.0'
    implementation 'org.springframework:spring-jms:6.1.14'

My JMS Configuration

@Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(format("tcp://%s:%s", host, port));
        connectionFactory.setUser(user);
        connectionFactory.setPassword(pass);

        return connectionFactory;
    }

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {

        Logger logger = (Logger) LoggerFactory.getLogger("org.springframework.jms");
        logger.setLevel(Level.ERROR);

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setErrorHandler(new CustomErrorHandler());
        configurer.configure(factory, connectionFactory);

        return factory;
    }

    @Bean
    public JmsTemplate jmsAnycastTemplate(ConnectionFactory connectionFactory) {
        return new JmsTemplate(connectionFactory);
    }

Worker client code

    public static void doAllWork(int moduleId, MqProducer mqProducer, MqMessage message){
        mqProducer.sendAcceptToWork(moduleId, message);

        Work.doSomeWork(moduleId, mqProducer);

        mqProducer.sendEndWork(moduleId, message, 200L);
    }

    @SneakyThrows
    public static void doSomeWork(int moduleId, MqProducer mqProducer){

        int secToWork = ThreadLocalRandom.current().nextInt(1, 10);
        LocalDateTime workstarted = LocalDateTime.now();

        while (Duration.between(workstarted, LocalDateTime.now()).getSeconds() < secToWork) {
            Thread.sleep(1000);
            System.out.printf("%s [api] WORK_ENDED message received%n", LocalDateTime.now());
            mqProducer.sendMessage("api", new MqMessage(IN_PROGRESS, moduleId, 200));
        }
    }

and there is output from worker client

sendAcceptToWork 3 0
2024-12-19T13:03:45.046627435 Working...
2024-12-19T13:03:46.057809869 Working...
2024-12-19T13:03:47.065647593 Working...
2024-12-19T13:03:48.074799444 Working...
2024-12-19T13:03:49.080706764 Working...
2024-12-19T13:03:50.088089181 Working...
2024-12-19T13:03:51.095799844 Working...
2024-12-19T13:03:52.104492990 Working...
2024-12-19T13:03:53.113162971 Working...
Work done

there is code api client

    @SneakyThrows
    @Override
    public void run(MqMessage message) {

        if (message.getMqOperationCode() == MqOperationCode.WORK_STARTED){
            System.out.println();
            System.out.printf("%s [api] WORK_STARTED message received%n", LocalDateTime.now());
            return;
        }
        if (message.getMqOperationCode() == MqOperationCode.IN_PROGRESS){
            System.out.printf("%s [api] IN_PROGRESS message received%n", LocalDateTime.now());
            return;
        }
        if (message.getMqOperationCode() == MqOperationCode.WORK_ENDED){
            System.out.printf("%s [api] WORK_ENDED message received%n", LocalDateTime.now());
            return;
        }

        System.out.printf("%s [api] message received : \n%s%n", LocalDateTime.now(), message);
    }

and output in same iteration

2024-12-19T13:03:53.141036336 [api] WORK_STARTED message received
2024-12-19T13:03:53.146047337 [api] IN_PROGRESS message received
2024-12-19T13:03:53.150053390 [api] IN_PROGRESS message received
2024-12-19T13:03:53.153668788 [api] IN_PROGRESS message received
2024-12-19T13:03:53.157834576 [api] IN_PROGRESS message received
2024-12-19T13:03:53.161060836 [api] IN_PROGRESS message received
2024-12-19T13:03:53.163221293 [api] IN_PROGRESS message received
2024-12-19T13:03:53.165163888 [api] IN_PROGRESS message received
2024-12-19T13:03:53.166759323 [api] IN_PROGRESS message received
2024-12-19T13:03:53.168365120 [api] IN_PROGRESS message received
2024-12-19T13:03:53.169784977 [api] WORK_ENDED message received

As you can see worker client send all messages in one time, but I need to send each message without waiting transaction ending.


Solution

  • I don't really understand why it doesn't worked correctly, because I was setting up JMS in non-transactional mode. But I found pretty stable solution for me. And solved another problem with synced sending or timeout.

    @Service
    @Slf4j
    public class MqProducerImpl implements MqProducer {
        @Qualifier("jmsMulticastTemplate")
        @Autowired()
        JmsTemplate jmsMulticastTemplate;
    
        @Qualifier("jmsAnycastTemplate")
        @Autowired()
        JmsTemplate jmsAnycastTemplate;
    
        private final long jmsTimeout = 10_000;
    
        @Override
        @SneakyThrows
        public void sendUpdateWork(long id) {
            MqMessage message = new MqMessage(MqOperationCode.IN_PROGRESS, id, myEnv.getModuleId(), Constants.IN_PROGRESS);
            String json = ow.writeValueAsString(message);
            log.debug("Sending information to the API about updating the processing of the id={}", id);
            sendWithTimeout(jmsMulticastTemplate, myEnv.getApiQueueName(), json, true);
        }
    
        @SneakyThrows
        public void sendEndWork(long id, long statusCode) {
            MqMessage message = new MqMessage(MqOperationCode.WORK_ENDED, id, myEnv.getModuleId(), statusCode);
            String json = ow.writeValueAsString(message);
            log.debug("Sending information to the API about the completion of processing of the id={}", id);
            sendWithTimeout(jmsMulticastTemplate, myEnv.getApiQueueName(), json, false);
        }
    
    
        public void commit(JmsTemplate jmsTemplate) {
            jmsTemplate.execute((SessionCallback<Void>) session -> {
                try {
                    JmsUtils.commitIfNecessary(session);
                } catch (JMSException e) {
                    threadNameUtil.updateThreadNameByModule();
                    log.warn("Error sending message to MQ queue. Reason: {}", e.getMessage());
                }
                return null;
            });
        }
    
        private void sendWithTimeout(JmsTemplate jmsTemplate, String destination, Object message, boolean sendNow) {
    
            Future<?> future = executor.submit(() -> {
                jmsTemplate.convertAndSend(destination, message);
                if (sendNow && jmsTemplate.isSessionTransacted()) {
                    commit(jmsTemplate);
                }
            });
    
            try {
                future.get(jmsTimeout, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                log.warn("Timeout exceeded sending message to mq. Broker may have blocked sending or is unavailable.");
                future.cancel(true);
            } catch (InterruptedException e) {
                log.warn("Program execution was interrupted");
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                log.warn("Error sending message via MQ: " + e.getMessage());
                future.cancel(true);
            }
        }
    }