pythonsshfsfsspec

Use connection pooling with python sshfs (fsspec) in Python


I'm using sshfs to fetch video files from a remote SSH storage:

@app.get("/video/{filename}")
async def video_endpoint(
    filename, range: str = Header(None), db=Depends(get_db)
):  # pylint: disable=redefined-builtin
    """
    Endpoint for video streaming

    Accepts the UUID and an arbitrary extension
    """
    # Requesting video with uuid
    (uuid, extension) = filename.split(".")  # pylint: disable=unused-variable

    # Try to get the file info from database
    file = File.get_by_public_uuid(db, uuid)

    # Return 404 if file not found
    if not file:
        raise HTTPException(404, "File not found")

    # Connect with a password
    ssh_fs = SSHFileSystem(
        settings.ssh_host,
        username=settings.ssh_username,
        password=settings.ssh_password,
    )

    start, end = range.replace("bytes=", "").split("-")
    start = int(start)
    end = int(end) if end else start + settings.chunk_size

    with ssh_fs.open(file.path) as video:
        video.seek(start)
        data = video.read(end - start)
        filesize = file.size
        headers = {
            "Content-Range": f"bytes {str(start)}-{str(end)}/{filesize}",
            "Accept-Ranges": "bytes",
        }
        return Response(data, status_code=206, headers=headers, media_type="video/mp4")

It does work for a few hours after restart, but then subsequent calls yield the error message asyncssh.sftp.SFTPNoConnection: Connection not open. As far as I can tell, even though the SSHFileSystem is initiated during the API call, it is actually cached in the backend.

Fsspec creates some asyncio event loop and returns the cached instance. I guess at some point the connection gets dropped by the other side, and for some reason it's not re-established automatically, nor can I find a way to actually use connection pooling.

I can avoid the error by calling ssh_fs.clear_instance_cache() at the end, but that means that every time a new connection is established, and this is for every chunk that is fetched, which also doesn't make sense.

My question is: How to use SFTPNoConnection connection pooling, in a way that keeps the connection alive and re-establishes it when needed?


Solution

  • I somehow solved the issue by wrapping the SSH connection establishment and manually handle exceptions, instance cache clearing and re-connecting:

    def get_ssh_fs():
        """
        Get the SSH FileSystem
        """
        error_count = MAX_CONNECTION_ATTEMPTS
    
        while error_count > 0:
            # This will used the cached instance by default, so return should be fast
            ssh_fs = SSHFileSystem(
                settings.ssh_host,
                username=settings.ssh_username,
                password=settings.ssh_password,
            )
    
            # Check if the connection is still alive
            try:
                ssh_fs.du("/")
                return ssh_fs
            except asyncssh.sftp.SFTPNoConnection as e:
                # Connection is lost, clear the instance cache and try again
                logging.warning("SSH Connection lost, reconnecting. Exception: %s", e)
                ssh_fs.clear_instance_cache()
                error_count -= 1
    
        logging.error("Could not reconnect to SSH server")
        return None
    

    It works, but I'm not sure that's the best method to handle the connection problem though, open for other suggestions.