I'm using Apache Camel 3.x to push some files to an SFTP server and the retries I've configured are not really respected. I've 3 types of deliveries to such a server (selection, status and data) and I've defined the following base class to handle the common setup of all of them:
public abstract class SftpUploader extends RouteBuilder {
protected final Logger logger = LoggerFactory.getLogger(getClass());
@Inject
protected SftpConfiguration config;
protected FilePathAdjusterProcessor processor;
protected final String routeId;
SftpUploader(String routeId) {
this.routeId = routeId;
}
@PostConstruct
private void setup() {
processor = new FilePathAdjusterProcessor(getBaseDirectory());
}
@Override
public void configure() {
onException(FileNotFoundException.class).continued(true);
errorHandler(defaultErrorHandler().maximumRedeliveries(max(0, config.getMaxRetries())).redeliveryDelay(max(0, config.getRetriesDelayMs())));
configureRoute();
}
protected abstract void configureRoute();
protected abstract String getBaseDirectory();
protected StringBuilder buildTargetSftpEndpoint() {
final StringBuilder builder = new StringBuilder();
builder.append("sftp:");
builder.append(config.getHost());
builder.append(":");
builder.append(config.getPort());
if(!config.getRootDirectoryName().startsWith("/")) builder.append("/");
builder.append(config.getRootDirectoryName());
builder.append("?preferredAuthentications=publickey");
builder.append("&fastExistsCheck=true");
builder.append("&greedy=true");
builder.append("&throwExceptionOnConnectFailed=true");
builder.append("&initialDelay=0");
builder.append("&stepwise=false");
builder.append("&reconnectDelay=0");
builder.append("&serverAliveCountMax=0");
builder.append("&serverAliveInterval=0");
builder.append("&username=");
builder.append(config.getUsername());
builder.append("&privateKeyFile=");
builder.append(config.getSshPrivateKey());
builder.append("&knownHostsFile=");
builder.append(config.getSshKnownHostsFile());
builder.append("&maximumReconnectAttempts=");
builder.append(max(0, config.getMaxRetries()));
return builder;
}
}
where configureRoute
is basically
@Override
protected void configureRoute() {
from(buildSourceEndpointUri())
.routeId(routeId)
.process(processor)
.to(buildTargetSftpEndpoint().toString())
.log(logLevel, logMessageTemplate);
}
and buildSourceEndpointUri
is either
@Override
protected StringBuilder buildTargetSftpEndpoint() {
return super.buildTargetSftpEndpoint().append("&disconnect=true");
}
private String buildSourceEndpointUri() {
final StringBuilder builder = new StringBuilder();
builder.append("file:");
if(!getBaseDirectory().startsWith("/")) builder.append("/");
builder.append(getBaseDirectory());
builder.append("?recursive=true");
builder.append("&flatten=false");
builder.append("&startingDirectoryMustExist=true");
builder.append("&noop=true");
builder.append("&idempotent=true");
builder.append("&idempotentKey=${file:name}-${file:modified}");
builder.append("&greedy=true");
builder.append("&initialDelay=0");
builder.append("&antInclude=");
builder.append("*/");
builder.append(selectionSpace.getDir());
builder.append("/*.xml");
return builder.toString();
}
(for the delivery of type selection
) or
private String buildSourceEndpointUri() {
final StringBuilder builder = new StringBuilder();
builder.append(endpointUri);
builder.append("?size=");
builder.append(MAX_VALUE);
builder.append("&concurrentConsumers=");
builder.append(max(1, config.getMaxConnections()));
return builder.toString();
}
for the other 2, where endpointUri
points to a SEDA
.
The idea behind is that the selection
deliveries are processed synchronously (since there are not so many and they're done on startup), while the status
and data
delivery are processed asynchronously with
sender = getContext().createFluentProducerTemplate();
sender.setDefaultEndpointUri(buildProducerEndpointUri());
sender.start();
private String buildProducerEndpointUri() {
final StringBuilder builder = new StringBuilder();
builder.append(endpointUri);
builder.append("?blockWhenFull=true");
builder.append("&timeout=0");
builder.append("&size=");
builder.append(MAX_VALUE);
return builder.toString();
}
public void publishFilesToFts(Set<File> files, DELIVERY_MODE deliveryMode) {
if(!config.isEnabled() || files == null || files.isEmpty()) return;
shutdown = false;
final List<Future<Exchange>> fileJobs = new ArrayList<>(files.size());
final String publishType = deliveryMode == INITIAL ? "initial load" : "daily update";
logger.info("Start to upload #{} {} files for the {} to the FTS server", files.size(), type, publishType);
for(File file : files) {
if(isShuttingDown()) break;
fileJobs.add(sender
.withHeader(FILE_PATH, file.getAbsolutePath())
.withHeader(FILE_LENGTH, file.length())
.withBodyAs(file, File.class)
.asyncSend()
);
}
for(Future<Exchange> fileJob : fileJobs) {
if(isShuttingDown()) break;
try {
fileJob.get();
} catch (InterruptedException e) {
if(isShuttingDown()) {
logger.trace("Upload of the {} files for the {} interrupted because the application is shutting down", type, publishType);
} else {
throw new RuntimeException(e);
}
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
}
where publishFilesToFts
gets called from elsewhere while the files are generated.
What I observe is the following:
RedeliveryPolicy
I see in the code should set it to DEBUG (but that's not a big deal)But then, after some time X (1 hour or so), the upload of the same files is retried over and over again.
Does somebody have an idea why this is happening?
It seems like that the errorHandler
doesn't record in the IdempotentRepository
that the retries have exhausted and, therefore, the upload of the files is retried over and over again.
Kudos to https://stackoverflow.com/a/45235892/5911228 for pointing this implicitly out.
Changing the exception handling to
onException(Exception.class)
.maximumRedeliveries(maxRetries)
.redeliveryDelay(max(0, config.getRetriesDelayMs()))
.handled(true)
.logHandled(true)
.logExhausted(true);
solved the issue.