javaspringspring-integrationspring-integration-sftp

Removing a file from ConcurrentMetadataStore if file processing fails in spring-integration-sftp


I am facing a problem while using spring-integration-sftp to pull files from SFTP servers and do some processing. I'm using RedisMetadataStore to power my SftpPersistentAcceptOnceFileListFilter. I'm using a Streaming Inbound Channel Adapter for my use case.

The problem is as soon as a new file event is delivered to my handler that file is marked as processed in the Redis metadata store, so now if my downstream processing fails, that file is lost forever. I have retries in place, but what I would like to do is mark that file as un-processed in redis and have it picked up again in next poll cycle.

I found that there is a remove method on RedisMetadataStore class that can remove a key if there is any exception while processing, but wanted to know is it advisable to call the remove method directly ? Because as far as I can understand, these methods are supposed to be called by the filter and not directly but I couldn't find any thing to deal with this situation.

Are there any constructs to remove a file from SftpPersistentAcceptOnceFileListFilter if it fails during processing ?


Solution

  • The mentioned SftpPersistentAcceptOnceFileListFilter is a ResettableFileListFilter:

    /**
     * A {@link FileListFilter} that can be reset by removing a specific file from its
     * state.
     * @param <F> The type that will be filtered.
     *
     * @author Gary Russell
     *
     * @since 4.1.7
     */
    public interface ResettableFileListFilter<F> extends FileListFilter<F> {
    

    So, you indeed can just have it as a top-level bean and call its remove() whenever your file processing has failed.

    The framework does that in several places, but really before the message is delivered to target channel:

    private void resetFilterIfNecessary(AbstractFileInfo<F> file) {
        if (this.filter instanceof ResettableFileListFilter) {
            this.logger.info(
                    LogMessage.format("Removing the remote file '%s' from the filter for a subsequent transfer attempt",
                            file.getFilename()));
            ((ResettableFileListFilter<F>) this.filter).remove(file.getFileInfo());
        }
    }
    

    That FileInfo is available for your as a FileHeaders.REMOTE_FILE_INFO header in the message you process downstream.