We're using the S3InboundFileSynchronizingMessageSource
feature of Spring Integration to locally sync and then send messages for any files retrieved from an S3 bucket.
Before syncing, we apply a couple of S3PersistentAcceptOnceFileListFilter
filters (to check the file's TimeModified and Hash/ETag) to make sure we only sync "new" files.
Note: We use the JdbcMetadataStore
table to persist the record of the files that have previously made it through the filters (using a different REGION
for each filter).
Finally, for the S3InboundFileSynchronizingMessageSource
local filter, we have a S3PersistentAcceptOnceFileListFilter
FileSystemPersistentAcceptOnceFileListFilter
-- again on TimeModified and again persisted but in a different region.
The issue is: if the service is restarted after the file has made it through the 1st filter but before the message source successfully sent the message along, we essentially drop the file and never actually process it.
What are we doing wrong? How can we avoid this "dropped file" issue?
I assume you use a FileSystemPersistentAcceptOnceFileListFilter
for the localFilter
since S3PersistentAcceptOnceFileListFilter
is not going to work there.
Let see how you use those filters in the configuration! I wonder if switching to the ChainFileListFilter
for your remote files helps you somehow.
See docs: https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-reading
EDIT
if the service is restarted after the file has made it through the 1st filter but before the message source successfully sent the message along
I think Gary is right: you need a transaction around that polling operation which includes filter logic as well.
See docs: https://docs.spring.io/spring-integration/docs/current/reference/html/jdbc.html#jdbc-metadata-store
This way the TX is not going to be committed until the message for a file leaves the polling channel adapter. Therefore after restart you simply will be able to synchronize the rolled back files again. Just because they are not present in the store for filtering.