One of our application is a Camel Spring-Boot application.
The route is fairly simple: it gets a file from a FTP server using the camel-ftp component and pushes it to a JMS queue. We use Apache Artemis as broker.
camel version is : 2.20.2 Spring-Boot is : 2.7.8
@Component
@Slf4j
public class FTPListenerRoute extends RouteBuilder {
@Value("${ems.activemq.queue.name}")
private String activeMqTargetQueueName;
@Value("${ems.error-uri:error}")
private String errorUri;
@Value("${ems.max-redeliveries}")
private int maximumRetries;
private final FTPConfiguration ftpConfiguration;
@Autowired
public FTPListenerRoute(FTPConfiguration ftpConfiguration) {
this.ftpConfiguration = ftpConfiguration;
}
@Override
public void configure() throws Exception {
errorHandler(
springTransactionErrorHandler()
.loggingLevel(LoggingLevel.DEBUG)
.maximumRedeliveries(0)
.log(log).logHandled(true));
String uri = ftpConfiguration.buildUri();
from(uri)
.routeId("listener-main-route")
//.transacted()
.doTry()
.to("direct:msgInTransaction")
.doCatch(RuntimeCamelException.class)
.log(LoggingLevel.ERROR, log, "caugt exception, rerouting to : {}", errorUri)
.to(errorUri)
.endDoTry();
from("direct:msgInTransaction")
.routeId("ftplistener-transacted-route")
.log(LoggingLevel.INFO, "Processing ${id} with headers: ${headers}")
.transacted()
.to("jms:queue:" + activeMqTargetQueueName);
}
@Bean
public PlatformTransactionManager jmsTransactionManager(@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
}
package be.fgov.minfin.esbsoa.ems.ftp.listener.configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.net.URISyntaxException;
import java.util.Optional;
@Component
public class FTPConfiguration {
// FTP Properties
@Value("${ems.ftp.username}")
private String ftpUsername;
@Value("${ems.ftp.password}")
private String ftpPassword;
@Value("${ems.ftp.host}")
private String ftpHost;
@Value("${ems.ftp.port:}")
private Optional<Integer> ftpPort;
@Value("${ems.ftp.path}")
private String ftpPath;
@Value("${ems.ftp.path.error:error}")
private String ftpErrorPath;
@Value("${ems.ftp.path.completed:completed}")
private String ftpCompletedPath;
@Value("${ems.ftp.delay:30000}")
private String ftpDelay;
@Value("${ems.ftp.filter.file.name:}")
private String fileNameFilter;
@Value("${ems.ftp.deleteFile:false}")
private boolean isFilesDeletionAfterCompletion;
@Value("${ems.ftp.filter.file.size:50000000}")
private int maxFileSize;
@Value("${ems.ftp.protocol:ftp}")
private String protocol;
@Value("${ems.ftp.passiveMode:true}")
private String passiveMode;
public String buildUri() throws URISyntaxException {
URIBuilder ftpUriBuilder = getUri(ftpPath);
ftpUriBuilder.addParameter("moveFailed", ftpErrorPath)
.addParameter("delay", ftpDelay)
.addParameter("binary", "true")
.addParameter("initialDelay", "5")
.addParameter("filterFile", "${file:size} <= " + maxFileSize)
.addParameter("readLock", "changed");
if (this.isFilesDeletionAfterCompletion) {
ftpUriBuilder.addParameter("delete", "true");
} else {
ftpUriBuilder.addParameter("move", ftpCompletedPath);
}
if (StringUtils.isNotBlank(fileNameFilter)) {
ftpUriBuilder.addParameter("include", fileNameFilter);
}
return ftpUriBuilder.build().toString();
}
private URIBuilder getUri(String path) {
URIBuilder uriBuilder = new URIBuilder()
.setScheme(protocol)
.setHost(ftpHost)
.setUserInfo(ftpUsername, ftpPassword)
.setPath(path)
.addParameter("passiveMode", passiveMode);
ftpPort.ifPresent(uriBuilder::setPort);
return uriBuilder;
}
}
During maintenance week-ends, servers are often rebooted and this route fails with the following error:
2023-03-14 07:50:53.596 INFO 1 --- [tra/Coda_Edepo/] ftplistener-main-route : Processing 3DAABB587130ED0-000000000000026E with headers: {CamelFileAbsolute=false, CamelFileAbsolutePath=Coda_Edepo/FIL.EMPCFF.679200407454.20230313.DEP, CamelFileHost=ftphost, CamelFileLastModified=1678754100000, CamelFileLength=516516, CamelFileName=FIL.EMPCFF.679200407454.20230313.DEP, CamelFileNameConsumed=FIL.EMPCFF.679200407454.20230313.DEP, CamelFileNameOnly=FIL.EMPCFF.679200407454.20230313.DEP, CamelFileParent=Coda_Edepo, CamelFilePath=Coda_Edepo//FIL.EMPCFF.679200407454.20230313.DEP, CamelFileRelativePath=FIL.EMPCFF.679200407454.20230313.DEP, CamelFtpReplyCode=226, CamelFtpReplyString=226 Transfer complete.
, CamelMessageTimestamp=1678754100000}
2023-03-14 07:50:53.617 WARN 1 --- [tra/Coda_Edepo/] o.a.c.c.file.GenericFileOnCompletion : Error during commit. Exchange[3DAABB587130ED0-000000000000026E]. Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot rename file: RemoteFile[FIL.EMPCFF.679200407454.20230313.DEP] to: RemoteFile[/Coda_Edepo//completed/FIL.EMPCFF.679200407454.20230313.DEP]]
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot rename file: RemoteFile[FIL.EMPCFF.679200407454.20230313.DEP] to: RemoteFile[/Coda_Edepo//completed/FIL.EMPCFF.679200407454.20230313.DEP]
at org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.renameFile(GenericFileProcessStrategySupport.java:147) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.commit(GenericFileRenameProcessStrategy.java:121) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.GenericFileOnCompletion.processStrategyCommit(GenericFileOnCompletion.java:134) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.GenericFileOnCompletion.onCompletion(GenericFileOnCompletion.java:86) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.GenericFileOnCompletion.onComplete(GenericFileOnCompletion.java:60) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.support.UnitOfWorkHelper.doneSynchronization(UnitOfWorkHelper.java:99) ~[camel-support-3.12.0.jar:3.12.0]
at org.apache.camel.support.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:88) ~[camel-support-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.DefaultUnitOfWork.done(DefaultUnitOfWork.java:238) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.support.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:59) ~[camel-support-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:775) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:710) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:263) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44) ~[camel-api-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.schedule(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:275) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:138) ~[camel-spring-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:109) ~[camel-core-processor-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:184) ~[camel-core-processor-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:156) ~[camel-ftp-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:191) ~[camel-support-3.12.0.jar:3.12.0]
at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:108) ~[camel-support-3.12.0.jar:3.12.0]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[na:na]
at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[na:na]
at java.base/java.lang.Thread.run(Unknown Source) ~[na:na]
After that, the file is reprocessed again and again by camel until the rename operation succeeds. This causes a big problem with duplicates in our database.
My understanding is the following:
completed
folderGenericFileOperationFailedException
is thrownI created an integration test using the Spock Framework and MockFtpServer library.
I have the following successful happy scenario:
def 'happy scenario'() {
given: 'one original file destination, one completed file destination'
fileSystem.add(new FileEntry("/upload/mockFile.txt", "test"))
fileSystem.add(new FileEntry("/upload/completed/mockFile.txt", "test"))
and: 'start the server'
ftpServer.start()
camelContext.start()
when: 'the file is actually sent to the ftp server'
remoteFile.storeFile("src/test-integration/resources/files/mockFile.txt", "/upload/mockFile.txt")
then: 'assert that it was'
// assert that the file is put on the ftp
"test" == remoteFile.readFile("/upload/mockFile.txt")
and: 'receive the file on the other end of the route'
def receivedMessage = jmsMessagingTemplate.receive(jmsDestinationQueue)
then: 'assert that that it is the correct file'
"test" == new String(receivedMessage.getPayload())
}
In the error scenario, I am stubbing the RENTO ftp command to throw GenericFileOperationFailedException
def 'failed scenario - GenericFileOperationFailedException'() {
given: 'an original file location on the FTP'
fileSystem.add(new FileEntry("/upload/mockFile.txt", "test"))
fileSystem.add(new FileEntry("/upload/completed/mockFile.txt"))
and: 'a custom ERROR code for the RNTO command'
ftpServer.setCommandHandler(CommandNames.RNTO, new ExceptionCommandHandler())
and: 'start the server'
ftpServer.start()
camelContext.start()
when: 'the file is actually sent to the ftp server'
remoteFile.storeFile("src/test-integration/resources/files/mockFile.txt", "/upload/mockFile.txt")
then:'assert that it was'
"test" == remoteFile.readFile("/upload/mockFile.txt")
and: 'assert that message was received in the destination queue'
def receivedFromDestinationQueue = jmsMessagingTemplate.receive(jmsQueueName)
assertNotNull(receivedFromDestinationQueue.payload)
and: 'try to poll DLQ, original message should be sent there'
def dlq = StringUtils.substringAfterLast(errorUri, ":")
def receivedFromDLQ = jmsMessagingTemplate.receive(dlq)
assertNotNull(receivedFromDLQ)
}
This test fails: the message is never received from the DLQ where the message should have been sent because of the GenericFileOperationFailedException.
I am polling both the destinationQueue, then the errorQueue because I noticed that the file WAS sent to the destination queue and processed.
What I'm trying to achieve is to be certain that the file is only delivered once to the JMS broker, in all circumstances. If there are retries or errors anywhere in the process, the messages should be sent to the DLQ.
I found a suitable solution inspired by this unit-test: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java
the idea is to reinject the exception in the route, so the error handler picks it up.
Following @TacheDeChoco suggestion, I set up a custom OnCompletionExceptionHandler
on my FTP endpoint.
To use a custom org.apache.camel.spi.ExceptionHandler to handle any thrown exceptions that happens during the file on completion process where the consumer does either a commit or rollback. The default implementation will log any exception at WARN level and ignore.
I also used the idempotent and idempotent-key options as I know there will be only one file sent per day.
https://camel.apache.org/components/3.20.x/ftp-component.html#_endpoint_query_option_idempotent
This is the final result:
public String buildUri() throws URISyntaxException {
URIBuilder ftpUriBuilder = getUri(ftpPath);
ftpUriBuilder
// other connection params
...
//DO NOT USE at the same time as onCompletionExceptionHandler
//.addParameter("bridgeErrorHandler", "true")
.addParameter("throwExceptionOnConnectFailed", "true")
.addParameter("onCompletionExceptionHandler", "#redirectExceptionHandler")
.addParameter("idempotent", "true")
.addParameter("idempotentKey", "${file:name}-${date:now:yyyyMMdd}")
;
@Slf4j
@Component
public class RedirectExceptionHandler implements ExceptionHandler {
private final ProducerTemplate template;
public RedirectExceptionHandler(ProducerTemplate template) {
this.template = template;
}
@Override
public void handleException(Throwable exception) {
handleException(exception.getMessage(), null, exception);
}
@Override
public void handleException(String message, Throwable exception) {
handleException(message, null, exception);
}
@Override
public void handleException(String message, Exchange exchange, Throwable exception) {
exchange.setException(exception);
exchange.setMessage(exchange.getIn());
// unsure if these are necessary or reset by the errorHandler
exchange.setRollbackOnly(false);
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
// use a producerTemplate to send the exchange to the errorHandling endpoint
template.send("direct:errorHandling", exchange);
}
}
@Override
public void configure() throws Exception {
errorHandler(springTransactionErrorHandler().maximumRedeliveries(0));
// configure onException behaviour for all routes, disable redeliveries
onException(Exception.class)
.handled(true)
.maximumRedeliveries(0)
.to(errorUri);
from(ftpConfiguration.buildUri())
.routeId("listener-main-route")
.to("direct:msgInTransaction")
;
// simple direct: endpoint to rethrow exceptions to the global error-handler
from("direct:errorHandling")
.throwException(new ErrorHandlingException())
;
from("direct:msgInTransaction")
.routeId("ftplistener-transacted-route")
.transacted()
.log(LoggingLevel.INFO, "Processing ${id} with headers: ${headers}")
.to("jms:queue:" + activeMqTargetQueueName)
;
}