javaspringspring-bootftpsshj

SSHJ SFTPException: Maximum concurrent transfers exceeded for the current context


I keep getting

SFTPException: Maximum concurrent transfers exceeded for the current context

while processing files

I'm using latest dependencies of SSHJ v0.38.0 with Spring Boot 3.0.3

<dependency>
    <groupId>com.hierynomus</groupId>
    <artifactId>sshj</artifactId>
    <version>0.38.0</version>
</dependency>

Below is my full error and exception log

2024-06-05T14:32:34.858+08:00 | INFO  | Now checking /043/Output/FILE//FILE2024-05-30-08-20-22.txt - com.vendor.robbijournal.service.SftpService.processFiles:154 [scheduling-1:16216]
2024-06-05T14:32:34.858+08:00 | DEBUG | Now read the content of file FILE2024-05-30-08-20-22.txt | current 6 / remaining 6 / total 12 - com.vendor.robbijournal.service.SftpService.processFiles:157 [scheduling-1:16216]
2024-06-05T14:32:34.859+08:00 | DEBUG | Opening `/043/Output/FILE/FILE2024-05-30-08-20-22.txt` - net.schmizz.sshj.sftp.SFTPClient.open:75 [scheduling-1:16216]
2024-06-05T14:32:35.097+08:00 | DEBUG | Sending close - net.schmizz.sshj.connection.channel.direct.SessionChannel.sendClose:287 [scheduling-1:16216]
2024-06-05T14:32:35.336+08:00 | DEBUG | Got chan request for `exit-status` - net.schmizz.sshj.connection.channel.direct.SessionChannel.gotChannelRequest:334 [sshj-Reader-xfer.robbi.com/127.0.0.1:22-1717568411096:16216]
2024-06-05T14:32:35.337+08:00 | DEBUG | Got close - net.schmizz.sshj.connection.channel.direct.SessionChannel.gotClose:220 [sshj-Reader-xfer.robbi.com/127.0.0.1:22-1717568411096:16216]
2024-06-05T14:32:35.339+08:00 | DEBUG | Forgetting `session` channel (#2) - net.schmizz.sshj.connection.ConnectionImpl.forget:89 [sshj-Reader-xfer.robbi.com/127.0.0.1:22-1717568411096:16216]
2024-06-05T14:32:35.340+08:00 | ERROR | We caught error : Maximum concurrent transfers exceeded for the current context - com.vendor.robbijournal.service.SftpService.fetchAndStoreFiles:137 [scheduling-1:16216]
net.schmizz.sshj.sftp.SFTPException: Maximum concurrent transfers exceeded for the current context
    at net.schmizz.sshj.sftp.Response.error(Response.java:140)
    at net.schmizz.sshj.sftp.Response.ensurePacketTypeIs(Response.java:117)
    at net.schmizz.sshj.sftp.SFTPEngine.open(SFTPEngine.java:169)
    at net.schmizz.sshj.sftp.SFTPClient.open(SFTPClient.java:76)
    at net.schmizz.sshj.sftp.SFTPClient.open(SFTPClient.java:81)
    at net.schmizz.sshj.sftp.SFTPClient.open(SFTPClient.java:86)
    at com.vendor.robbijournal.service.SftpService.processFiles(SftpService.java:158)
    at com.vendor.robbijournal.service.SftpService.fetchAndStoreFiles(SftpService.java:118)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:354)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:392)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:720)
    at com.vendor.robbijournal.service.SftpService$$SpringCGLIB$$0.fetchAndStoreFiles(<generated>)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.runInternal(ScheduledMethodRunnable.java:130)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.lambda$run$2(ScheduledMethodRunnable.java:124)
    at io.micrometer.observation.Observation.observe(Observation.java:499)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:124)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)

Below is my services class

package com.vendor.robbijournal.service;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;

import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import com.vendor.robbijournal.constant.WhichXferFileStatus;
import com.vendor.robbijournal.constant.WhichXferFileType;
import com.vendor.robbijournal.model.Flag;
import com.vendor.robbijournal.model.Sol;
import com.vendor.robbijournal.model.XferFtpFiles;
import com.vendor.robbijournal.repository.FlagRepository;
import com.vendor.robbijournal.repository.XferFtpFilesRepository;
import com.vendor.robbijournal.utils.FilePermissionUtil;
import com.vendor.robbijournal.utils.LoggerFactoryUtil;

import ch.qos.logback.classic.Logger;
import jakarta.transaction.Transactional;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.sftp.FileAttributes;
import net.schmizz.sshj.sftp.RemoteResourceInfo;
import net.schmizz.sshj.sftp.SFTPClient;
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;

@Service
@Transactional
public class SftpService {

    private static final Logger LOG = LoggerFactoryUtil.getLogger();

    @Autowired
    private SSHClient sshClient;

    @Autowired
    private XferFtpFilesRepository repository;

    @Autowired
    private FlagRepository flagRepository;

    @Value("${spring.profiles.active}")
    private String activeProfile;

    @Value("${spring.jpa.properties.hibernate.jdbc.time_zone}")
    private String hibernateTimeZone;

    @Value("${sftp.host}")
    private String host;

    @Value("${sftp.port}")
    private int port;

    @Value("${sftp.username}")
    private String username;

    @Value("${sftp.password}")
    private String password;

    private String[] DIRECTORIES = {  
            "/043/Output/FILE/" 
    };

    @Scheduled(fixedRate = 60000) // Run every 60 seconds
    //@Scheduled(fixedRate = 600000) // run every hour
    public void fetchAndStoreFiles() throws IOException {
        LOG.trace("Starting to fetch files from SFTP server..");
        try {
            // find Flag record SFTP_FETCHING
            List<Flag> flag = flagRepository.findByKey("SFTP_FETCHING");

            if (flag.size() == 0) {
                Flag f = new Flag();
                f.setKey("SFTP_FETCHING");
                f.setValue1("ON");
                f.setTimeZone(hibernateTimeZone);
                flagRepository.save(f);
            }

            if (flag.size() > 0) {
                Flag f = flag.get(0);
                if(f.getValue1().equalsIgnoreCase("ON")) {
                    LOG.trace("SFTP_FETCHING in progress..abort this process");
                    return; // Abort the current execution
                }
                f.setValue1("ON");
                flagRepository.save(f);
            }

            // Check if FTP session is active
            LOG.debug("sshClient.isConnected() = " + sshClient.isConnected());
            if (!sshClient.isConnected()) {
                LOG.debug("Look like not connected to SFTP server.. let reconnecting..");
                sshClient = new SSHClient();
                sshClient.addHostKeyVerifier(new PromiscuousVerifier());
                // Configure your SSH connection details here
                sshClient.connect(host, port);
                // KeyProvider keys = sshClient.loadKeys("path/to/private/key", "keyPassword");
                // sshClient.authPublickey("username", keys);
                LOG.debug("Connected to SFTP server: " + host + ":" + port + "using username: " + username+ " and password: " + password);
                sshClient.authPassword(username, password);
            }

            LOG.debug("sshClient.isAuthenticated() = " + sshClient.isAuthenticated());

            try (SFTPClient sftpClient = sshClient.newSFTPClient()) {
                for (String directory : DIRECTORIES) {
                    processFiles(sftpClient, directory);
                }
                LOG.debug("Finish checked {} directories!",DIRECTORIES.length);
                sftpClient.close();
                LOG.debug("sftpClient.close()");
            }

            sshClient.disconnect();
            LOG.debug("Done fetching file and sshClient.isConnected() = " + sshClient.isConnected() + "! ");

            // find Flag record
            flag = flagRepository.findByKey("SFTP_FETCHING");
            if (flag.size() > 0) {
                Flag f = flag.get(0);
                f.setValue1("OFF");
                flagRepository.save(f);
            }

        } catch (Exception e) {
            LOG.error("We caught error : {}", e.getMessage(), e);
            List<Flag> flag = flagRepository.findByKey("SFTP_FETCHING");
            flag = flagRepository.findByKey("SFTP_FETCHING");
            if (flag.size() > 0) {
                Flag f = flag.get(0);
                f.setValue1("OFF");
                flagRepository.save(f);
            }
            e.printStackTrace();
        }
    }

    private void processFiles(SFTPClient sftpClient, String directory) throws IOException {
        LOG.info("Let check {}", directory);
        List<RemoteResourceInfo> files = sftpClient.ls(directory);
        int fileNumber = 0;
        for (RemoteResourceInfo entry : files) {
            LOG.info("Now checking {}/{}", directory, entry.getName());
            if (!entry.isDirectory()) {
                fileNumber = fileNumber+1;
                LOG.debug("Now read the content of file {} | current {} / remaining {} / total {}", entry.getName(), fileNumber, files.size() - fileNumber, files.size());
                try (InputStream inputStream = sftpClient.open(entry.getPath()).new RemoteFileInputStream()) {
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    /*
                     * 1024  byte = 1K
                     * 8192  byte = 8K
                     * 16384 byte = 16K
                     */
                    byte[] buffer = new byte[16384];
                    int readCount;

                    while ((readCount = inputStream.read(buffer)) > 0) {
                        byteArrayOutputStream.write(buffer, 0, readCount);
                    }
                    String fileContent = byteArrayOutputStream.toString(StandardCharsets.UTF_8);
                    // Count the number of lines in the file content
                    long lineCount = fileContent.lines().count();
                    String md5sum = DigestUtils.md5Hex(byteArrayOutputStream.toByteArray());

                    // Check if a file with the same MD5 already exists
                    Optional<XferFtpFiles> existingFile = repository.findByMd5sum(md5sum);
                    if (existingFile.isPresent()) {
                        if (entry.getName().equalsIgnoreCase(existingFile.get().getFilename())) {
                            LOG.warn("File {} with the same MD5 {} already exists, now skip this file", entry.getName(),
                                    md5sum);
                        } else {
                            LOG.warn("File {} with MD5 {} have same content as {}, now skip this file", entry.getName(),
                                    md5sum, existingFile.get().getFilename());
                        }
                        continue;
                    }

                    // Logic to determine version : Check if a file with the same filename already
                    // exists
                    List<XferFtpFiles> existingFiles = repository.findByFilename(entry.getName());
                    Integer newVersion = 1;

                    if (!existingFiles.isEmpty()) {
                        LOG.debug("Seem we have {} record with same file name {}", existingFiles.size(),
                                entry.getName());
                        List<XferFtpFiles> maxVersionOptList = repository
                                .findByFilenameOrderByVersionAsc(entry.getName());

                        for (XferFtpFiles maxVersionOpt : maxVersionOptList) {
                            newVersion = maxVersionOpt.getVersion();
                            maxVersionOpt.setStatus(WhichXferFileStatus.IGNORED_USE_NEW_VERSION);
                            repository.save(maxVersionOpt);

                            // find Sol and update the status also
                            List<Sol> solList = maxVersionOpt.getSol();
                            for (Sol sol : solList) {
                                // sol.setStatus(WhichXferFileStatus.IGNORED_USE_NEW_VERSION);
                                // repository.save(sol);
                            }
                        }
                        newVersion = newVersion + 1;
                        LOG.warn("Seems {} have an update, set new update as version {}", entry.getName(), newVersion);
                    }

                    FileAttributes attrs = entry.getAttributes();
                    String permissions = FilePermissionUtil.getPermissionString(attrs);
                    XferFtpFiles xferFtpFile = new XferFtpFiles();
                    xferFtpFile.setHost(sshClient.getRemoteHostname());
                    xferFtpFile.setPort(Integer.toString(sshClient.getRemotePort()));
                    xferFtpFile.setDirectory(directory);
                    xferFtpFile.setPermission(permissions);
                    xferFtpFile.setFilename(entry.getName());
                    xferFtpFile.setPayload(fileContent);
                    xferFtpFile.setPayloadLine((int) lineCount);
                    xferFtpFile.setMd5sum(md5sum);
                    xferFtpFile.setVersion(newVersion);
                    xferFtpFile.setTs_ftp(new Timestamp((long) attrs.getMtime() * 1000));
                    xferFtpFile.setTimeZone(hibernateTimeZone);
                    xferFtpFile.setStatus(WhichXferFileStatus.NEW);
                    xferFtpFile.setFileType(WhichXferFileType.TO_CHECK);
                    xferFtpFile.setFilesize(attrs.getSize());
                    // Save the file to DB
                    repository.save(xferFtpFile);
                }
            }
        }
    }
}

and this is bean for SSHClient

package com.vendor.robbijournal.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import net.schmizz.sshj.SSHClient;

@Configuration
public class SshClientConfig {
    @Bean
    public SSHClient sshClient() {
        SSHClient sshClient = new SSHClient();
        // Additional configuration if needed
        return sshClient;
    }
}
package com.vendor.robbijournal.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import net.schmizz.sshj.SSHClient;

@Configuration
public class SshClientConfig {
    @Bean
    public SSHClient sshClient() {
        SSHClient sshClient = new SSHClient();
        // Additional configuration if needed
        return sshClient;
    }
}

Solution

  • I do not know sshj (nor Java actually), but checking sshj source code, it does not look like the RemoteFileOutputStream.close actually closes the remote file.

    It's RemoteFile.close that does close it only.

    So you need to make sure both the stream and the file are closed:

    try (
        RemoteFile remoteFile = sftpClient.open(entry.getPath());
        InputStream inputStream = remoteFile.new RemoteFileInputStream()
    ) {