We need to use queues in our Java EE application and since it is a cloud base application (deployed on OpenShift Online), we like to use amazon sqs.
If I understand the theorie of the receiving part of JMS / Java EE correctly, a @MessageDriven
bean is managed by the Java EE container so that a lot of bean instances are created in parallel (according max pool size), if the number of the incoming messages is high. This is of course a big benefit to process high loads.
However, I do not see how we can integrate aws sqs this way in a Java EE application. I know the asynchronous receiver examples from http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html:
class MyListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
// Cast the received message as TextMessage and print the text to screen.
if (message != null) {
System.out.println("Received: " + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
and then:
// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);
// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());
// Start receiving incoming messages.
connection.start();
This is the official asynchronous receiver example - which is not a @MessageDriven
bean. It is obvious, that we need somewhere the credentials to authenticate (by creating an SQSConnectionFactory, then a connection, then a session - which is also well described in the example).
But I strongly suppose that this example will not process the messages in parallel - i.e. only one bean instance is processing the queue and this is not a good solution for scalable, high loaded applications.
a) How can we go the real Java EE way with Amazon SQS? I just find a planty of Spring examples. But it must be Java EE 7.
b) We use Wildfly (currently 8.2.1). Would it be also possible to let Wildfly manage the connection to AWS and application internally, we could use the queue as if it were an application server managed queue (same approach like data sources for DB access)?
Conclusion after got an answer from stdunbar:
It seems not to be possible in a 'proper way', what I like to do. So what should I do? Implement a ManagedExecutorService
as stdunbar described to 'wrap' the queue? - However this implies to have a local queue as well and this is not a good situation for an application, which should be scaleable?!
What is about alternatives? We are running the application on OpenShift Online. It would probably be bether to instantiate an own gear with e.g. ApacheMQ Cartridge... there are of course a lot of disadventages like costs and that we are responsible for the 'infrastructure'.
To be honest, I am really disappointed of AWS in this case...
I don't think that my solution is proper JAVA EE, but in my case it works.
Configuration:
@Singleton
public class SqsMessageManager
{
private Integer numberOfReceivers = 3;
public static SQSConnection connection = null;
public static Queue queue = null;
@Inject
SqsMessageReceiver sqsMessageReceiver;
public void init()
{
try
{
SQSConnectionFactory connectionFactory =
SQSConnectionFactory.builder()
.withRegion(Region.getRegion(Regions.EU_WEST_1))
.withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider())
.build();
connection = connectionFactory.createConnection();
queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue");
for (int i = 0; i < numberOfReceivers; i++)
connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver);
connection.start();
}
catch (JMSException e)
{
e.getStackTrace();
}
}
}
Then the sender:
@Dependent
public class SqsMessageSender
{
MessageProducer producer = null;
Session senderSession = null;
@PostConstruct
public void createProducer(){
try
{
// open new session and message producer
senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = senderSession.createProducer(SqsMessageManager.queue);
}catch(JMSException | NullPointerException e){
;
}
}
@PreDestroy
public void destroy(){
try
{
// close session
producer.close();
senderSession.close();
}catch(JMSException e){
}
}
// sends a message to aws sqs queue
public void sendMessage(String txt)
{
try
{
TextMessage textMessage = senderSession.createTextMessage(txt);
producer.send(textMessage);
}
catch (JMSException e)
{
e.getStackTrace();
}
}
}
And the receiver:
@Dependent
public class SqsMessageReceiver implements MessageListener
{
public void onMessage(Message inMessage) {
...
}
}