I'm using sshfs to fetch video files from a remote SSH storage:
@app.get("/video/{filename}")
async def video_endpoint(
filename, range: str = Header(None), db=Depends(get_db)
): # pylint: disable=redefined-builtin
"""
Endpoint for video streaming
Accepts the UUID and an arbitrary extension
"""
# Requesting video with uuid
(uuid, extension) = filename.split(".") # pylint: disable=unused-variable
# Try to get the file info from database
file = File.get_by_public_uuid(db, uuid)
# Return 404 if file not found
if not file:
raise HTTPException(404, "File not found")
# Connect with a password
ssh_fs = SSHFileSystem(
settings.ssh_host,
username=settings.ssh_username,
password=settings.ssh_password,
)
start, end = range.replace("bytes=", "").split("-")
start = int(start)
end = int(end) if end else start + settings.chunk_size
with ssh_fs.open(file.path) as video:
video.seek(start)
data = video.read(end - start)
filesize = file.size
headers = {
"Content-Range": f"bytes {str(start)}-{str(end)}/{filesize}",
"Accept-Ranges": "bytes",
}
return Response(data, status_code=206, headers=headers, media_type="video/mp4")
It does work for a few hours after restart, but then subsequent calls yield the error message asyncssh.sftp.SFTPNoConnection: Connection not open
. As far as I can tell, even though the SSHFileSystem
is initiated during the API call, it is actually cached in the backend.
Fsspec creates some asyncio event loop and returns the cached instance. I guess at some point the connection gets dropped by the other side, and for some reason it's not re-established automatically, nor can I find a way to actually use connection pooling.
I can avoid the error by calling ssh_fs.clear_instance_cache()
at the end, but that means that every time a new connection is established, and this is for every chunk that is fetched, which also doesn't make sense.
My question is: How to use SFTPNoConnection
connection pooling, in a way that keeps the connection alive and re-establishes it when needed?
I somehow solved the issue by wrapping the SSH connection establishment and manually handle exceptions, instance cache clearing and re-connecting:
def get_ssh_fs():
"""
Get the SSH FileSystem
"""
error_count = MAX_CONNECTION_ATTEMPTS
while error_count > 0:
# This will used the cached instance by default, so return should be fast
ssh_fs = SSHFileSystem(
settings.ssh_host,
username=settings.ssh_username,
password=settings.ssh_password,
)
# Check if the connection is still alive
try:
ssh_fs.du("/")
return ssh_fs
except asyncssh.sftp.SFTPNoConnection as e:
# Connection is lost, clear the instance cache and try again
logging.warning("SSH Connection lost, reconnecting. Exception: %s", e)
ssh_fs.clear_instance_cache()
error_count -= 1
logging.error("Could not reconnect to SSH server")
return None
It works, but I'm not sure that's the best method to handle the connection problem though, open for other suggestions.