spring-bootamazon-sqsspring-cloud-aws

Read messages from different AWS account using @SqsListener


I have an SQS standard queue that is provided by a third party vendor who has given access to our IAM user to read messages from there. So the AWS account ID for the queue is different than the one of my user.

I'm trying to use spring's @SqsListener annotation to consume these messages but I'm having trouble specifying the accountId that should be consumed from.

My bean configuration for the client looks like this:

@Bean
fun amazonSQSAsyncClient(): AmazonSQSAsync = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(AWSStaticCredentialsProvider(BasicAWSCredentials(awsProperties.accessKey, awsProperties.secretKey)))                
.withEndpointConfiguration(AwsClientBuilder.EndpointConfiguration(awsProperties.url, awsProperties.region))                
.build() 

I see no way of specifying the account Id in the credentials, and I also could not find any properties that can be used to define an accountId.

I tried setting the awsProperties.url shown above to something like https://sqs.us-east-1.amazonaws.com/<accountId> but this does not seem to be working. It is still trying to look for the queue in my own account Id and throwing a queue not found error.

Any ideas how to fix this and force the Spring AWS bean to consume from a specific AwsAccount?


Solution

  • You have a user that can access the queu in another account. That means you can run code with that user in your account and that can access the queue on another account.

    Initializing a sqsclient will always use the account it is running on You don't have to adjust this.

    @Bean
    fun amazonSQSAsyncClient(): AmazonSQSAsync = AmazonSQSAsyncClientBuilder.standard()
    .withCredentials(AWSStaticCredentialsProvider(BasicAWSCredentials(awsProperties.accessKey, awsProperties.secretKey)))                        
    .build() 
    

    You need to make sure the code can access the queue.

    In the code you should set your queue URL like this: https://sqs.<region>.amazonaws.com/<account>/<queuename>

    , I quickly tried to access a queue from another account. If the permissions on the queue are correctly set, you have two possibilities. The first one is using the queue URL instead of the name (I checked, it works). The second one is creating you own DestinationResolver and providing it to the SimpleMessageListenerContainer. I created a small app with Spring Boot and it worked well. I pasted you the code below.

    In a next feature release I'll figure out a better way to support this use case.

    package demo;
    
    import com.amazonaws.services.sqs.AmazonSQS;
    import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
    import com.amazonaws.services.sqs.model.GetQueueUrlResult;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.aws.core.env.ResourceIdResolver;
    import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
    import org.springframework.cloud.aws.messaging.support.destination.DynamicQueueUrlDestinationResolver;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.core.DestinationResolutionException;
    import org.springframework.messaging.core.DestinationResolver;
    import org.springframework.messaging.handler.annotation.MessageMapping;
    import org.springframework.util.Assert;
    
    @SpringBootApplication
    public class DemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(DemoApplication.class, args);
        }
    
        @Bean
        public MessageListener messageListener() {
            return new MessageListener();
        }
    
        @Bean
        public SimpleMessageListenerContainerFactory simpleMessageListenerFactory(AmazonSQS amazonSqs, ResourceIdResolver resourceIdResolver) {
            SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
            factory.setDestinationResolver(new DynamicAccountAwareQueueUrlDestinationResolver(amazonSqs, resourceIdResolver));
    
            return factory;
        }
    
        public static class DynamicAccountAwareQueueUrlDestinationResolver implements DestinationResolver<String> {
    
            public static final String ACCOUNT_QUEUE_SEPARATOR = ":";
            private final AmazonSQS amazonSqs;
            private final DynamicQueueUrlDestinationResolver dynamicQueueUrlDestinationResolverDelegate;
    
            public DynamicAccountAwareQueueUrlDestinationResolver(AmazonSQS amazonSqs, ResourceIdResolver resourceIdResolver) {
                Assert.notNull(amazonSqs, "amazonSqs must not be null");
    
                this.amazonSqs = amazonSqs;
                this.dynamicQueueUrlDestinationResolverDelegate = new DynamicQueueUrlDestinationResolver(amazonSqs, resourceIdResolver);
            }
    
            @Override
            public String resolveDestination(String queue) throws DestinationResolutionException {
                if (queue.contains(ACCOUNT_QUEUE_SEPARATOR)) {
                    String account = queue.substring(0, queue.indexOf(ACCOUNT_QUEUE_SEPARATOR));
                    String queueName = queue.substring(queue.indexOf(ACCOUNT_QUEUE_SEPARATOR) + 1);
                    GetQueueUrlResult queueUrlResult = this.amazonSqs.getQueueUrl(new GetQueueUrlRequest()
                            .withQueueName(queueName)
                            .withQueueOwnerAWSAccountId(account));
                    return queueUrlResult.getQueueUrl();
                } else {
                    return this.dynamicQueueUrlDestinationResolverDelegate.resolveDestination(queue);
                }
            }
        }
    
        public static class MessageListener {
    
            private static Logger LOG = LoggerFactory.getLogger(MessageListener.class);
    
            @MessageMapping("633332177961:queue-name")
            public void listen(String message) {
                LOG.info("Received message: {}", message);
            }
    
        }
    
    }