jakarta-eeejbjmsamazon-sqsmessage-driven-bean

using amazon sqs in a @MessageDriven bean - pooling / parallel processing


We need to use queues in our Java EE application and since it is a cloud base application (deployed on OpenShift Online), we like to use amazon sqs.

If I understand the theorie of the receiving part of JMS / Java EE correctly, a @MessageDriven bean is managed by the Java EE container so that a lot of bean instances are created in parallel (according max pool size), if the number of the incoming messages is high. This is of course a big benefit to process high loads.

However, I do not see how we can integrate aws sqs this way in a Java EE application. I know the asynchronous receiver examples from http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html:

class MyListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            // Cast the received message as TextMessage and print the text to screen.
            if (message != null) {
                System.out.println("Received: " + ((TextMessage) message).getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

and then:

// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);

// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());

// Start receiving incoming messages.
connection.start();

This is the official asynchronous receiver example - which is not a @MessageDriven bean. It is obvious, that we need somewhere the credentials to authenticate (by creating an SQSConnectionFactory, then a connection, then a session - which is also well described in the example).
But I strongly suppose that this example will not process the messages in parallel - i.e. only one bean instance is processing the queue and this is not a good solution for scalable, high loaded applications.

a) How can we go the real Java EE way with Amazon SQS? I just find a planty of Spring examples. But it must be Java EE 7.

b) We use Wildfly (currently 8.2.1). Would it be also possible to let Wildfly manage the connection to AWS and application internally, we could use the queue as if it were an application server managed queue (same approach like data sources for DB access)?

Conclusion after got an answer from stdunbar:
It seems not to be possible in a 'proper way', what I like to do. So what should I do? Implement a ManagedExecutorService as stdunbar described to 'wrap' the queue? - However this implies to have a local queue as well and this is not a good situation for an application, which should be scaleable?! What is about alternatives? We are running the application on OpenShift Online. It would probably be bether to instantiate an own gear with e.g. ApacheMQ Cartridge... there are of course a lot of disadventages like costs and that we are responsible for the 'infrastructure'.

To be honest, I am really disappointed of AWS in this case...


Solution

  • I don't think that my solution is proper JAVA EE, but in my case it works.

    Configuration:

    @Singleton
    public class SqsMessageManager
    {
        private Integer numberOfReceivers = 3;
    
        public static SQSConnection connection = null;
        public static Queue queue = null;
    
        @Inject
        SqsMessageReceiver sqsMessageReceiver;
    
        public void init()
        {
            try
            {
                SQSConnectionFactory connectionFactory =
                        SQSConnectionFactory.builder()
                                .withRegion(Region.getRegion(Regions.EU_WEST_1))
                                .withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider())
                                .build();
    
                connection = connectionFactory.createConnection();
    
                queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue");
    
                for (int i = 0; i < numberOfReceivers; i++)
                    connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver);
    
                connection.start();
            }
            catch (JMSException e)
            {
                e.getStackTrace();
            }
        }
    }
    

    Then the sender:

    @Dependent
    public class SqsMessageSender
    {
        MessageProducer producer = null;
        Session senderSession = null;
    
        @PostConstruct
        public void createProducer(){
            try
            {
                // open new session and message producer
                senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                producer = senderSession.createProducer(SqsMessageManager.queue);
            }catch(JMSException | NullPointerException e){
                ;
            }
        }
    
        @PreDestroy
        public void destroy(){
            try
            {
                // close session
                producer.close();
                senderSession.close();
            }catch(JMSException e){
    
            }
        }
    
        // sends a message to aws sqs queue
        public void sendMessage(String txt)
        {
            try
            {
                TextMessage textMessage = senderSession.createTextMessage(txt);
                producer.send(textMessage);
            }
            catch (JMSException e)
            {
                e.getStackTrace();
            }
        }
    }
    

    And the receiver:

    @Dependent
    public class SqsMessageReceiver implements MessageListener
    {
        public void onMessage(Message inMessage) {
            ...
        }
    }