spring-bootamazon-s3spring-integrationspring-integration-aws

Sync S3 Bucket and listen for changes


I've got an AWS S3 bucket, where I place on a weekly basis a new ZIP file.

I want to add a functionality to my existing Web Service, written with Spring Boot: synchronize the bucket locally and watch for changes.

For the time being, synchronization works well: whenever a new file is added to the bucket, it gets downloaded locally. However, I don't know to listen for file updates, this is, a method that fires when a new file is downloaded locally. Can it be done?

This is the piece of code I've:

#  --------
# | AWS S3 |
#  --------
s3.credentials-access-key=***
s3.credentials-secret-key=****
s3.bucket = my-bucket
s3.remote-dir = zips
s3.local-dir = D:/s3-bucket/
@Log4j2
@Configuration
public class S3Config {

    public static final String OUT_CHANNEL_NAME = "s3filesChannel";

    @Value("${s3.credentials-access-key}") private String accessKey;
    @Value("${s3.credentials-secret-key}") private String secretKey;
    @Value("${s3.remote-dir}") private String remoteDir;
    @Value("${s3.bucket}") private String s3bucket;
    @Value("${s3.local-dir}") private String localDir;

    /*
     * AWS S3
     */
    @Bean
    public AmazonS3 getAmazonS3(

    ){
        BasicAWSCredentials creds = new BasicAWSCredentials(accessKey, secretKey);
        AmazonS3 s3client = AmazonS3ClientBuilder
                .standard()
                .withRegion(Regions.EU_WEST_1)
                .withCredentials(new AWSStaticCredentialsProvider(creds))
                .build();
        return s3client;        
    }

    @Bean
    public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
        return new S3SessionFactory(pAmazonS3);
    }

    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
        S3InboundFileSynchronizer sync = new S3InboundFileSynchronizer(pS3SessionFactory);
        sync.setPreserveTimestamp(true);
        sync.setDeleteRemoteFiles(false);
        String fullRemotePath = s3bucket.concat("/").concat(remoteDir);
        sync.setRemoteDirectory(fullRemotePath);
        sync.setFilter(new S3RegexPatternFileListFilter(".*\\.zip$"));
        return sync;
    }

    @Bean
    @InboundChannelAdapter(value = OUT_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
        S3InboundFileSynchronizer pS3InboundFileSynchronizer
    ) {
        S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new File(localDir));
        messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>());
        return messageSource;
    }

    @Bean("s3filesChannel")
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow fileReadingFlow(
            S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource,
            GtfsBizkaibus pGtfsBizkaibus,
            @Qualifier("fileProcessor") MessageHandler pMessageHandler) {
        return IntegrationFlows
                .from(pS3InboundFileSynchronizingMessageSource, e -> e.poller(p -> p.fixedDelay(5, TimeUnit.SECONDS)))
                .handle(pMessageHandler)
                .get();
    }

    @Bean("fileProcessor")
    public MessageHandler fileProcessor() {
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(localDir));
        handler.setExpectReply(false); // end of pipeline, reply not needed
        handler.setFileExistsMode(FileExistsMode.APPEND);
        handler.setNewFileCallback((file, msg) -> {
            log.debug("New file created... " + file.getAbsolutePath());
        });
        return handler;
    }


Solution

  • Actually, the S3InboundFileSynchronizingMessageSource does all the necessary work for you: when new file is added into a remote bucket, it is downloaded to local dir and produced as a payload in the message to be sent to the configured channel.

    When remote file is modified, it is also downloaded to local dir.

    Starting with version 5.0, the AbstractInboundFileSynchronizingMessageSource provides this option:

    /**
     * Switch the local {@link FileReadingMessageSource} to use its internal
     * {@code FileReadingMessageSource.WatchServiceDirectoryScanner}.
     * @param useWatchService the {@code boolean} flag to switch to
     * {@code FileReadingMessageSource.WatchServiceDirectoryScanner} on {@code true}.
     * @since 5.0
     */
    public void setUseWatchService(boolean useWatchService) {
        this.fileSource.setUseWatchService(useWatchService);
        if (useWatchService) {
            this.fileSource.setWatchEvents(
                    FileReadingMessageSource.WatchEventType.CREATE,
                    FileReadingMessageSource.WatchEventType.MODIFY,
                    FileReadingMessageSource.WatchEventType.DELETE);
        }
    }
    

    If that makes some sense to you.

    But yeah... with S3 to SQS notification it is also going to be a good solution. There is an SqsMessageDrivenChannelAdapter in Spring Integration AWS project.