spring-cloud-streamamazon-kinesisamazon-kcl

Different credentials for Kinesis Stream, DynamoDB and CloudWatch inside Spring Cloud Stream


I am using Spring Cloud Stream Kinesis binder (version 2.1.0)

Because of security reasons, I must have one set of credentials for Kinesis and another set of credentials for DynamoDB and CloudWatch.

Everything works fine if spring.cloud.stream.kinesis.binder.kplKclEnabled is set to false. But if it is set to true I have the exception

com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream {my_stream} under account {my_account} not found

Whole stack trace available at https://pastebin.com/bjvKSzrg

I would like to have KCL enabled so does anybody know how to avoid this error?

I know that error happens because user credentials for cloudwatch and dynamodb don't "see" mentioned Kinesis stream. But why is there a need from them to see it? Also, if KCL is disabled it works as expected. so don't see why it wouldn't work with enabled KCL

Here is my properties file

spring.main.allow-bean-definition-overriding=true
spring.cloud.stream.bindings.input.destination=streamName
spring.cloud.stream.bindings.input.group=worker
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.kinesis.bindings.input.consumer.listener-mode=batch
spring.cloud.stream.bindings.input.binder=kinesisConsumer



spring.cloud.stream.binders.kinesisConsumer.type=kinesis
spring.cloud.stream.binders.kinesisConsumer.defaultCandidate=false
spring.cloud.stream.binders.kinesisConsumer.environment.spring.main.sources=com.philips.ka.oneka.kinesis.config.KinesisOutputConfiguration

cloud.aws.stack.auto=false
cloud.aws.credentials.useDefaultAwsCredentialsChain=false
cloud.aws.credentials.instanceProfile=true

spring.cloud.stream.kinesis.binder.kplKclEnabled=true

Mentioned configuration class

@Configuration
@EnableConfigurationProperties(AwsProperties.class)
public class KinesisOutputConfiguration {
    AwsProperties.Properties properties;

    public KinesisOutputConfiguration(AwsProperties awsProperties) {
        this.properties = awsProperties.getStreamType().get(AwsProperties.StreamType.SPECTRE);
    }

    @Bean(destroyMethod = "shutdown")
    public AmazonKinesisAsync amazonKinesis() {
        RefreshingCredentials refreshingCredentials = new RefreshingCredentials(this.properties.getRefreshed.getUrl(), this.properties.getHsdp().getClientId(),
                this.properties.getRefreshed().getClientSecret(), this.properties.getRefreshed().getUsername(), this.properties.getRefreshed().getPassword(),
                this.properties.getRefreshed().getDiscoveryUrl(), new UriTemplate("{databroker_url}/Stream/$getaccessdetails"),
                new RestTemplate());
        return AmazonKinesisAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("eu-west-1").build();
    }


    @Bean(destroyMethod = "shutdown")
    public AmazonCloudWatchAsync cloudWatch() {
        AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
                this.properties.getSecretKey()));
        return AmazonCloudWatchAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
    }

    @Bean(destroyMethod = "shutdown")
    @Primary
    public AmazonDynamoDBAsync dynamoDBAsync() {
        AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
                this.properties.getSecretKey()));
        return AmazonDynamoDBAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
    }

}

Solution

  • Your configuration is correct: if you need to use different credentials for those services, you definitely need to declare custom beans for them. The DynamoDB and CloudWatch are required services for Kinesis Client Library. It is used from one hand to manage an offset from stream shards, and on the other - to handle consumer instances changes in the cluster for shards exclusive access. So, it's indeed the fact that Kinesis resource must be available for DynamoDB and CloudWatch users.

    See more info in Kinesis Client Library or ask AWS support: nothing Kinesis Binder can do for you on the matter...

    https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html