I need send some messages without waiting transaction ending.
But all settings I tried like
template.setSessionTransacted(false);
and setting different acknowledge modes didn't bring any results.
There is jms libraries I use. Now it use all settings default but I tried use all aknowledge mods enable end disable transaction mods
implementation 'org.apache.activemq:artemis-jakarta-client:2.38.0'
implementation 'org.apache.activemq:artemis-core-client:2.38.0'
implementation 'org.apache.activemq:artemis-commons:2.38.0'
implementation 'org.springframework:spring-jms:6.1.14'
My JMS Configuration
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(format("tcp://%s:%s", host, port));
connectionFactory.setUser(user);
connectionFactory.setPassword(pass);
return connectionFactory;
}
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
Logger logger = (Logger) LoggerFactory.getLogger("org.springframework.jms");
logger.setLevel(Level.ERROR);
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setErrorHandler(new CustomErrorHandler());
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public JmsTemplate jmsAnycastTemplate(ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
Worker client code
public static void doAllWork(int moduleId, MqProducer mqProducer, MqMessage message){
mqProducer.sendAcceptToWork(moduleId, message);
Work.doSomeWork(moduleId, mqProducer);
mqProducer.sendEndWork(moduleId, message, 200L);
}
@SneakyThrows
public static void doSomeWork(int moduleId, MqProducer mqProducer){
int secToWork = ThreadLocalRandom.current().nextInt(1, 10);
LocalDateTime workstarted = LocalDateTime.now();
while (Duration.between(workstarted, LocalDateTime.now()).getSeconds() < secToWork) {
Thread.sleep(1000);
System.out.printf("%s [api] WORK_ENDED message received%n", LocalDateTime.now());
mqProducer.sendMessage("api", new MqMessage(IN_PROGRESS, moduleId, 200));
}
}
and there is output from worker client
sendAcceptToWork 3 0
2024-12-19T13:03:45.046627435 Working...
2024-12-19T13:03:46.057809869 Working...
2024-12-19T13:03:47.065647593 Working...
2024-12-19T13:03:48.074799444 Working...
2024-12-19T13:03:49.080706764 Working...
2024-12-19T13:03:50.088089181 Working...
2024-12-19T13:03:51.095799844 Working...
2024-12-19T13:03:52.104492990 Working...
2024-12-19T13:03:53.113162971 Working...
Work done
there is code api client
@SneakyThrows
@Override
public void run(MqMessage message) {
if (message.getMqOperationCode() == MqOperationCode.WORK_STARTED){
System.out.println();
System.out.printf("%s [api] WORK_STARTED message received%n", LocalDateTime.now());
return;
}
if (message.getMqOperationCode() == MqOperationCode.IN_PROGRESS){
System.out.printf("%s [api] IN_PROGRESS message received%n", LocalDateTime.now());
return;
}
if (message.getMqOperationCode() == MqOperationCode.WORK_ENDED){
System.out.printf("%s [api] WORK_ENDED message received%n", LocalDateTime.now());
return;
}
System.out.printf("%s [api] message received : \n%s%n", LocalDateTime.now(), message);
}
and output in same iteration
2024-12-19T13:03:53.141036336 [api] WORK_STARTED message received
2024-12-19T13:03:53.146047337 [api] IN_PROGRESS message received
2024-12-19T13:03:53.150053390 [api] IN_PROGRESS message received
2024-12-19T13:03:53.153668788 [api] IN_PROGRESS message received
2024-12-19T13:03:53.157834576 [api] IN_PROGRESS message received
2024-12-19T13:03:53.161060836 [api] IN_PROGRESS message received
2024-12-19T13:03:53.163221293 [api] IN_PROGRESS message received
2024-12-19T13:03:53.165163888 [api] IN_PROGRESS message received
2024-12-19T13:03:53.166759323 [api] IN_PROGRESS message received
2024-12-19T13:03:53.168365120 [api] IN_PROGRESS message received
2024-12-19T13:03:53.169784977 [api] WORK_ENDED message received
As you can see worker client send all messages in one time, but I need to send each message without waiting transaction ending.
I don't really understand why it doesn't worked correctly, because I was setting up JMS in non-transactional mode. But I found pretty stable solution for me. And solved another problem with synced sending or timeout.
@Service
@Slf4j
public class MqProducerImpl implements MqProducer {
@Qualifier("jmsMulticastTemplate")
@Autowired()
JmsTemplate jmsMulticastTemplate;
@Qualifier("jmsAnycastTemplate")
@Autowired()
JmsTemplate jmsAnycastTemplate;
private final long jmsTimeout = 10_000;
@Override
@SneakyThrows
public void sendUpdateWork(long id) {
MqMessage message = new MqMessage(MqOperationCode.IN_PROGRESS, id, myEnv.getModuleId(), Constants.IN_PROGRESS);
String json = ow.writeValueAsString(message);
log.debug("Sending information to the API about updating the processing of the id={}", id);
sendWithTimeout(jmsMulticastTemplate, myEnv.getApiQueueName(), json, true);
}
@SneakyThrows
public void sendEndWork(long id, long statusCode) {
MqMessage message = new MqMessage(MqOperationCode.WORK_ENDED, id, myEnv.getModuleId(), statusCode);
String json = ow.writeValueAsString(message);
log.debug("Sending information to the API about the completion of processing of the id={}", id);
sendWithTimeout(jmsMulticastTemplate, myEnv.getApiQueueName(), json, false);
}
public void commit(JmsTemplate jmsTemplate) {
jmsTemplate.execute((SessionCallback<Void>) session -> {
try {
JmsUtils.commitIfNecessary(session);
} catch (JMSException e) {
threadNameUtil.updateThreadNameByModule();
log.warn("Error sending message to MQ queue. Reason: {}", e.getMessage());
}
return null;
});
}
private void sendWithTimeout(JmsTemplate jmsTemplate, String destination, Object message, boolean sendNow) {
Future<?> future = executor.submit(() -> {
jmsTemplate.convertAndSend(destination, message);
if (sendNow && jmsTemplate.isSessionTransacted()) {
commit(jmsTemplate);
}
});
try {
future.get(jmsTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.warn("Timeout exceeded sending message to mq. Broker may have blocked sending or is unavailable.");
future.cancel(true);
} catch (InterruptedException e) {
log.warn("Program execution was interrupted");
Thread.currentThread().interrupt();
} catch (Exception e) {
log.warn("Error sending message via MQ: " + e.getMessage());
future.cancel(true);
}
}
}