spring-integrationspring-integration-aws

spring-integration-aws S3StreamingMessageSource how do I delete the remotefile?


i currently work with the S3StreamingMessageSource from spring integration aws. I pass the stream on to an integration flow.

public MessageSource<InputStream> s3InboundStreamingMessageSource() {
    S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
    messageSource.setRemoteDirectory(bucketName);
    messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
            "streaming"));
    return messageSource;
}

@Bean
public IntegrationFlow s3IntegrationFlow() {
    return IntegrationFlows.from(s3InboundStreamingMessageSource(), spec -> spec.poller(Pollers.fixedDelay(10, TimeUnit.SECONDS)))
            .transform(new S3ObjectInputStreamToStringTransformer())
            .transform(Transformers.toJson())
            .handle(Http.outboundGateway("http://localhost:8099/create").httpMethod(HttpMethod.POST).extractPayload(true))
            .channel("nullChannel")
            .get();
}

How can I delete the retrieved remote file from S3?

In the S3InboundFileSynchronizer there is a method for this.

Like this:

    @Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer() {
    S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(factory);
    synchronizer.setDeleteRemoteFiles(true);
    synchronizer.setPreserveTimestamp(true);
    synchronizer.setRemoteDirectory(bucketName);
    return synchronizer;
}

Can anyone help me or tell me a good workaround?


Solution

  • We the streaming channel adapter there is no local copy of the remote file therefore we can't guess what and how you are going to do with the InputStream for the remote file. So, that's the reason why there is no setDeleteRemoteFiles() on the S3StreamingMessageSource.

    I see you do something like this S3ObjectInputStreamToStringTransformer. Tell me, please, what is the reason for this custom transformer. There is already a StreamTransformer and with its charset option the InputStream for remote file is going to be converted to string:

    /**
     * Construct an instance with the charset to convert the stream to a
     * String; if null a {@code byte[]} will be produced instead.
     * @param charset the charset.
     */
    public StreamTransformer(String charset) {
    

    Also: need to keep in mind that StaticMessageHeaderAccessor.getCloseableResource(message) must be closed after reading an InputStream to avoid resource leaking.

    Probably instead of .channel("nullChannel") you should consider to use a handle() for calling an AmazonS3.deleteObject(String bucketName, String key) API. The bucketName is stored in the FileHeaders.REMOTE_DIRECTORY and key in the FileHeaders.REMOTE_FILE headers respectively.