spring-bootspring-integrationspring-integration-sftp

How can I ensure that multiple instances of my Spring Integration application do not process the same SFTP file simultaneously?


I am using Spring Integration for file transfer, Spring Batch for processing, and Spring Integration again to mark files as done. My setup includes:

Despite this setup, I am encountering issues where multiple instances end up grabbing the same file, causing the entire flow to break. I need a solution to distribute the files across instances without synchronization issues, possibly by implementing some sort of locking mechanism.

I have tried using both fixed delay and cron scheduling, but I do not want this application to fail in production because of this issue.

Any advice or best practices to handle this scenario would be greatly appreciated.

Spring Integration Flow:

IntegrationFlow.from(
                    Sftp.inboundAdapter(sftpSessionFactory)
                            .remoteDirectory(".")
                            .deleteRemoteFiles(false)
                            .filter(remoteFileFilter)
                            .localDirectory(new File(localDirectory))
                            .autoCreateLocalDirectory(true)
                            .maxFetchSize(1),
                    configurer -> configurer.poller(Pollers.cron("some-cron-here")
                            .advice(rotatingServerAdvice)))
            .transform(transformerService, "toJobLaunchRequest")
            .channel(jobLaunchingChannel)
            .get();

Error Stacktrace (using 'transactional()'):

2024-09-13T10:50:02.499-04:00 ERROR 16323 --- [some-service] [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Problem occurred while synchronizing '/remote-file-path' to local directory
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:345)
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:266)
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:69)
    at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:47)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:355)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.integration.aop.ReceiveMessageAdvice.invoke(ReceiveMessageAdvice.java:56)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223)
    at jdk.proxy2/jdk.proxy2.$Proxy118.receive(Unknown Source)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:220)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:450)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:355)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:379)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223)
    at jdk.proxy2/jdk.proxy2.$Proxy117.call(Unknown Source)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:54)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:459)
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:338)
    ... 43 more
Caused by: org.springframework.jdbc.UncategorizedSQLException: PreparedStatementCallback; uncategorized SQLException for SQL [SELECT METADATA_VALUE FROM INT_METADATA_STORE
WHERE METADATA_KEY=? AND REGION=?
]; SQL state [25P02]; error code [0]; ERROR: current transaction is aborted, commands ignored until end of transaction block
    at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1549)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:677)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:723)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:754)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:767)
    at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:889)
    at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:916)
    at org.springframework.integration.jdbc.metadata.JdbcMetadataStore.putIfAbsent(JdbcMetadataStore.java:230)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:355)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:379)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:720)
    at org.springframework.integration.jdbc.metadata.JdbcMetadataStore$$SpringCGLIB$$0.putIfAbsent(<generated>)
    at org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter.accept(AbstractPersistentAcceptOnceFileListFilter.java:83)
    at org.springframework.integration.file.filters.ChainFileListFilter.accept(ChainFileListFilter.java:68)
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.transferFilesFromRemoteToLocal(AbstractInboundFileSynchronizer.java:374)
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.lambda$synchronizeToLocalDirectory$0(AbstractInboundFileSynchronizer.java:339)
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:450)
    ... 44 more
Caused by: org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2733)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2420)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:372)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:517)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:434)
    at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
    at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:137)
    at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeQuery(ProxyPreparedStatement.java:52)
    at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeQuery(HikariProxyPreparedStatement.java)
    at org.springframework.jdbc.core.JdbcTemplate$1.doInPreparedStatement(JdbcTemplate.java:732)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:658)
    ... 69 more
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "int_metadata_store_pk"
  Detail: Key (metadata_key, region)=(remote:SimpleTestFile (94).txt, DEFAULT) already exists.
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2733)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2420)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:372)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:517)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:434)
    at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
    at org.postgresql.jdbc.PgPreparedStatement.executeUpdate(PgPreparedStatement.java:155)
    at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeUpdate(ProxyPreparedStatement.java:61)
    at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeUpdate(HikariProxyPreparedStatement.java)
    at org.springframework.jdbc.core.JdbcTemplate.lambda$update$2(JdbcTemplate.java:975)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:658)
    at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:970)
    at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1014)
    at org.springframework.integration.jdbc.metadata.JdbcMetadataStore.tryToPutIfAbsent(JdbcMetadataStore.java:241)
    at org.springframework.integration.jdbc.metadata.JdbcMetadataStore.putIfAbsent(JdbcMetadataStore.java:222)
    ... 63 more

Solution

  • We have confirmed that this is a bug in Spring Integration and it has a fix for this issue: https://github.com/spring-projects/spring-integration/issues/9481.

    The releases for 6.2.9, 6.3.4 and 6.4.0-M3 are going to happen today.

    The problem is that PostgreSQL rollbacks transaction in case of query failure. In our case we have to add ON CONFLICT DO NOTHING for the INSERT operation.

    In case of MySQL-based vendors, we receive CannotAcquireLockException instead of DataIntegrityViolationException. So, that one is included into the logic as well.