recently I got myself into data analysis with some friends and to improve our data exchange we got a linux server which we use as a SFTP server. Following this we no longer want to write outputs to our local filesystem and then move it to the SFTP server (be it manually or automatically) but we would like to read and write the data directly to the server.
Given the dimensions of our data we decided to write it as parquet files, since especially the filter function is to our advantage.
import paramiko
import pyarrow.dataset as ds
from fsspec.implementations.sftp import SFTPFileSystem
pkey = paramiko.RSAKey.from_private_key_file(str(Path(os.path.expanduser('~')) / '.ssh/id_rsa'))
fs = SFTPFileSystem(host=host, port=port, username=username, pkey=pkey, look_for_keys=False)
# Don't know if this is necessary, it also works without calling open_sftp()
fs.client.open_sftp()
dataset = ds.dataset('/data/project_xxx/', filesystem=fs, partitioning='hive', format='parquet')
table = dataset.to_table(columns=[...], filter=...)
data = table.to_pandas()
data.to_excel('output/sftptest.xlsx')
This works perfectly fine for a single file. However, when we only specify the hive folder like in the example above we receive a multitude of errors (and always different errors). Among them the most common are:
Note that all this errors occur when calling dataset.to_table().
Does someone have a clue what we are doing wrong? Or how we could improve our code?
We expected to see the same behaviour as when we read partitioned parquets from a local filesystem. Or the same when we read a single file directly from the SFTP server.
I don't know for sure, but it may be the case that paramiko is having trouble handling connections from many worker threads at once (arrow may well be launching many threads silently). Turning on the logger "fsspec.sftp" might help you make sense of the calls being made.
Really, we should be using an async SSH library for this. You might want to try with sshfs (with the "ssh:" protocol prefix), which anecdotally shows much better performance and multiplexing.
When debugging this type of problem, it may also be worth trying with fastparquet with or without dask.