I keep getting
SFTPException: Maximum concurrent transfers exceeded for the current context
while processing files
I'm using latest dependencies of SSHJ v0.38.0 with Spring Boot 3.0.3
<dependency>
<groupId>com.hierynomus</groupId>
<artifactId>sshj</artifactId>
<version>0.38.0</version>
</dependency>
Below is my full error and exception log
2024-06-05T14:32:34.858+08:00 | INFO | Now checking /043/Output/FILE//FILE2024-05-30-08-20-22.txt - com.vendor.robbijournal.service.SftpService.processFiles:154 [scheduling-1:16216]
2024-06-05T14:32:34.858+08:00 | DEBUG | Now read the content of file FILE2024-05-30-08-20-22.txt | current 6 / remaining 6 / total 12 - com.vendor.robbijournal.service.SftpService.processFiles:157 [scheduling-1:16216]
2024-06-05T14:32:34.859+08:00 | DEBUG | Opening `/043/Output/FILE/FILE2024-05-30-08-20-22.txt` - net.schmizz.sshj.sftp.SFTPClient.open:75 [scheduling-1:16216]
2024-06-05T14:32:35.097+08:00 | DEBUG | Sending close - net.schmizz.sshj.connection.channel.direct.SessionChannel.sendClose:287 [scheduling-1:16216]
2024-06-05T14:32:35.336+08:00 | DEBUG | Got chan request for `exit-status` - net.schmizz.sshj.connection.channel.direct.SessionChannel.gotChannelRequest:334 [sshj-Reader-xfer.robbi.com/127.0.0.1:22-1717568411096:16216]
2024-06-05T14:32:35.337+08:00 | DEBUG | Got close - net.schmizz.sshj.connection.channel.direct.SessionChannel.gotClose:220 [sshj-Reader-xfer.robbi.com/127.0.0.1:22-1717568411096:16216]
2024-06-05T14:32:35.339+08:00 | DEBUG | Forgetting `session` channel (#2) - net.schmizz.sshj.connection.ConnectionImpl.forget:89 [sshj-Reader-xfer.robbi.com/127.0.0.1:22-1717568411096:16216]
2024-06-05T14:32:35.340+08:00 | ERROR | We caught error : Maximum concurrent transfers exceeded for the current context - com.vendor.robbijournal.service.SftpService.fetchAndStoreFiles:137 [scheduling-1:16216]
net.schmizz.sshj.sftp.SFTPException: Maximum concurrent transfers exceeded for the current context
at net.schmizz.sshj.sftp.Response.error(Response.java:140)
at net.schmizz.sshj.sftp.Response.ensurePacketTypeIs(Response.java:117)
at net.schmizz.sshj.sftp.SFTPEngine.open(SFTPEngine.java:169)
at net.schmizz.sshj.sftp.SFTPClient.open(SFTPClient.java:76)
at net.schmizz.sshj.sftp.SFTPClient.open(SFTPClient.java:81)
at net.schmizz.sshj.sftp.SFTPClient.open(SFTPClient.java:86)
at com.vendor.robbijournal.service.SftpService.processFiles(SftpService.java:158)
at com.vendor.robbijournal.service.SftpService.fetchAndStoreFiles(SftpService.java:118)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:354)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:392)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:720)
at com.vendor.robbijournal.service.SftpService$$SpringCGLIB$$0.fetchAndStoreFiles(<generated>)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.springframework.scheduling.support.ScheduledMethodRunnable.runInternal(ScheduledMethodRunnable.java:130)
at org.springframework.scheduling.support.ScheduledMethodRunnable.lambda$run$2(ScheduledMethodRunnable.java:124)
at io.micrometer.observation.Observation.observe(Observation.java:499)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:124)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Below is my services class
package com.vendor.robbijournal.service;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import com.vendor.robbijournal.constant.WhichXferFileStatus;
import com.vendor.robbijournal.constant.WhichXferFileType;
import com.vendor.robbijournal.model.Flag;
import com.vendor.robbijournal.model.Sol;
import com.vendor.robbijournal.model.XferFtpFiles;
import com.vendor.robbijournal.repository.FlagRepository;
import com.vendor.robbijournal.repository.XferFtpFilesRepository;
import com.vendor.robbijournal.utils.FilePermissionUtil;
import com.vendor.robbijournal.utils.LoggerFactoryUtil;
import ch.qos.logback.classic.Logger;
import jakarta.transaction.Transactional;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.sftp.FileAttributes;
import net.schmizz.sshj.sftp.RemoteResourceInfo;
import net.schmizz.sshj.sftp.SFTPClient;
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
@Service
@Transactional
public class SftpService {
private static final Logger LOG = LoggerFactoryUtil.getLogger();
@Autowired
private SSHClient sshClient;
@Autowired
private XferFtpFilesRepository repository;
@Autowired
private FlagRepository flagRepository;
@Value("${spring.profiles.active}")
private String activeProfile;
@Value("${spring.jpa.properties.hibernate.jdbc.time_zone}")
private String hibernateTimeZone;
@Value("${sftp.host}")
private String host;
@Value("${sftp.port}")
private int port;
@Value("${sftp.username}")
private String username;
@Value("${sftp.password}")
private String password;
private String[] DIRECTORIES = {
"/043/Output/FILE/"
};
@Scheduled(fixedRate = 60000) // Run every 60 seconds
//@Scheduled(fixedRate = 600000) // run every hour
public void fetchAndStoreFiles() throws IOException {
LOG.trace("Starting to fetch files from SFTP server..");
try {
// find Flag record SFTP_FETCHING
List<Flag> flag = flagRepository.findByKey("SFTP_FETCHING");
if (flag.size() == 0) {
Flag f = new Flag();
f.setKey("SFTP_FETCHING");
f.setValue1("ON");
f.setTimeZone(hibernateTimeZone);
flagRepository.save(f);
}
if (flag.size() > 0) {
Flag f = flag.get(0);
if(f.getValue1().equalsIgnoreCase("ON")) {
LOG.trace("SFTP_FETCHING in progress..abort this process");
return; // Abort the current execution
}
f.setValue1("ON");
flagRepository.save(f);
}
// Check if FTP session is active
LOG.debug("sshClient.isConnected() = " + sshClient.isConnected());
if (!sshClient.isConnected()) {
LOG.debug("Look like not connected to SFTP server.. let reconnecting..");
sshClient = new SSHClient();
sshClient.addHostKeyVerifier(new PromiscuousVerifier());
// Configure your SSH connection details here
sshClient.connect(host, port);
// KeyProvider keys = sshClient.loadKeys("path/to/private/key", "keyPassword");
// sshClient.authPublickey("username", keys);
LOG.debug("Connected to SFTP server: " + host + ":" + port + "using username: " + username+ " and password: " + password);
sshClient.authPassword(username, password);
}
LOG.debug("sshClient.isAuthenticated() = " + sshClient.isAuthenticated());
try (SFTPClient sftpClient = sshClient.newSFTPClient()) {
for (String directory : DIRECTORIES) {
processFiles(sftpClient, directory);
}
LOG.debug("Finish checked {} directories!",DIRECTORIES.length);
sftpClient.close();
LOG.debug("sftpClient.close()");
}
sshClient.disconnect();
LOG.debug("Done fetching file and sshClient.isConnected() = " + sshClient.isConnected() + "! ");
// find Flag record
flag = flagRepository.findByKey("SFTP_FETCHING");
if (flag.size() > 0) {
Flag f = flag.get(0);
f.setValue1("OFF");
flagRepository.save(f);
}
} catch (Exception e) {
LOG.error("We caught error : {}", e.getMessage(), e);
List<Flag> flag = flagRepository.findByKey("SFTP_FETCHING");
flag = flagRepository.findByKey("SFTP_FETCHING");
if (flag.size() > 0) {
Flag f = flag.get(0);
f.setValue1("OFF");
flagRepository.save(f);
}
e.printStackTrace();
}
}
private void processFiles(SFTPClient sftpClient, String directory) throws IOException {
LOG.info("Let check {}", directory);
List<RemoteResourceInfo> files = sftpClient.ls(directory);
int fileNumber = 0;
for (RemoteResourceInfo entry : files) {
LOG.info("Now checking {}/{}", directory, entry.getName());
if (!entry.isDirectory()) {
fileNumber = fileNumber+1;
LOG.debug("Now read the content of file {} | current {} / remaining {} / total {}", entry.getName(), fileNumber, files.size() - fileNumber, files.size());
try (InputStream inputStream = sftpClient.open(entry.getPath()).new RemoteFileInputStream()) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
/*
* 1024 byte = 1K
* 8192 byte = 8K
* 16384 byte = 16K
*/
byte[] buffer = new byte[16384];
int readCount;
while ((readCount = inputStream.read(buffer)) > 0) {
byteArrayOutputStream.write(buffer, 0, readCount);
}
String fileContent = byteArrayOutputStream.toString(StandardCharsets.UTF_8);
// Count the number of lines in the file content
long lineCount = fileContent.lines().count();
String md5sum = DigestUtils.md5Hex(byteArrayOutputStream.toByteArray());
// Check if a file with the same MD5 already exists
Optional<XferFtpFiles> existingFile = repository.findByMd5sum(md5sum);
if (existingFile.isPresent()) {
if (entry.getName().equalsIgnoreCase(existingFile.get().getFilename())) {
LOG.warn("File {} with the same MD5 {} already exists, now skip this file", entry.getName(),
md5sum);
} else {
LOG.warn("File {} with MD5 {} have same content as {}, now skip this file", entry.getName(),
md5sum, existingFile.get().getFilename());
}
continue;
}
// Logic to determine version : Check if a file with the same filename already
// exists
List<XferFtpFiles> existingFiles = repository.findByFilename(entry.getName());
Integer newVersion = 1;
if (!existingFiles.isEmpty()) {
LOG.debug("Seem we have {} record with same file name {}", existingFiles.size(),
entry.getName());
List<XferFtpFiles> maxVersionOptList = repository
.findByFilenameOrderByVersionAsc(entry.getName());
for (XferFtpFiles maxVersionOpt : maxVersionOptList) {
newVersion = maxVersionOpt.getVersion();
maxVersionOpt.setStatus(WhichXferFileStatus.IGNORED_USE_NEW_VERSION);
repository.save(maxVersionOpt);
// find Sol and update the status also
List<Sol> solList = maxVersionOpt.getSol();
for (Sol sol : solList) {
// sol.setStatus(WhichXferFileStatus.IGNORED_USE_NEW_VERSION);
// repository.save(sol);
}
}
newVersion = newVersion + 1;
LOG.warn("Seems {} have an update, set new update as version {}", entry.getName(), newVersion);
}
FileAttributes attrs = entry.getAttributes();
String permissions = FilePermissionUtil.getPermissionString(attrs);
XferFtpFiles xferFtpFile = new XferFtpFiles();
xferFtpFile.setHost(sshClient.getRemoteHostname());
xferFtpFile.setPort(Integer.toString(sshClient.getRemotePort()));
xferFtpFile.setDirectory(directory);
xferFtpFile.setPermission(permissions);
xferFtpFile.setFilename(entry.getName());
xferFtpFile.setPayload(fileContent);
xferFtpFile.setPayloadLine((int) lineCount);
xferFtpFile.setMd5sum(md5sum);
xferFtpFile.setVersion(newVersion);
xferFtpFile.setTs_ftp(new Timestamp((long) attrs.getMtime() * 1000));
xferFtpFile.setTimeZone(hibernateTimeZone);
xferFtpFile.setStatus(WhichXferFileStatus.NEW);
xferFtpFile.setFileType(WhichXferFileType.TO_CHECK);
xferFtpFile.setFilesize(attrs.getSize());
// Save the file to DB
repository.save(xferFtpFile);
}
}
}
}
}
and this is bean for SSHClient
package com.vendor.robbijournal.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import net.schmizz.sshj.SSHClient;
@Configuration
public class SshClientConfig {
@Bean
public SSHClient sshClient() {
SSHClient sshClient = new SSHClient();
// Additional configuration if needed
return sshClient;
}
}
package com.vendor.robbijournal.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import net.schmizz.sshj.SSHClient;
@Configuration
public class SshClientConfig {
@Bean
public SSHClient sshClient() {
SSHClient sshClient = new SSHClient();
// Additional configuration if needed
return sshClient;
}
}
I do not know sshj (nor Java actually), but checking sshj source code, it does not look like the RemoteFileOutputStream.close
actually closes the remote file.
It's RemoteFile.close
that does close it only.
So you need to make sure both the stream and the file are closed:
try (
RemoteFile remoteFile = sftpClient.open(entry.getPath());
InputStream inputStream = remoteFile.new RemoteFileInputStream()
) {