I would like to process many remote zst-compressed JSONL files that are available via http/https. I do not have enough space to store the file (I'm running many in parallel and they're many gigs), so I want to decompress them as they are being downloaded. I am testing the following code:
#!/usr/bin/env python3
import argparse
import json
import zstandard as zstd
import io
import requests
import time
from typing import Union
from urllib.parse import urlparse
def process_file(input_file: str) -> None:
"""
Process a zstandard compressed JSONL file or a stream.
This function takes a file path or a URL as input, decompresses it, and then
processes each JSON object one by one.
:param input_file: The input file path or URL to process.
"""
# Create a decompression object
# Increase the max_window_size if necessary
# Be careful as this can potentially use a lot of memory
# dctx = zstd.ZstdDecompressor(max_window_size=2**30) # for example, allow up to 1 GB
dctx = zstd.ZstdDecompressor()
# Check if the input is a URL
if input_file.startswith(('http://', 'https://')):
# Stream the file from the URL
response = requests.get(input_file, stream=True)
reader = dctx.stream_reader(response.raw)
else:
# If the input is a file path
f = open(input_file, 'rb')
reader = dctx.stream_reader(f)
text_stream = io.TextIOWrapper(reader, encoding='utf-8')
line_counter = 0
start_time = time.time()
for line in text_stream:
# Each line is a JSON string, so we parse it into a Python object
data = json.loads(line)
line_counter += 1
if line_counter % 100_000 == 0:
elapsed_time = time.time() - start_time
print(f'Processed {line_counter:,d} lines in {elapsed_time:,.2f} seconds ({line_counter / elapsed_time:,.2f} lines/sec)')
# Don't forget to close the file if it's not a URL
if not input_file.startswith(('http://', 'https://')):
f.close()
def main():
"""
Main function to parse command line arguments and process the input file.
"""
parser = argparse.ArgumentParser(description='Process a zstandard compressed JSONL file.')
parser.add_argument('--input-file', '-i', type=str, required=True, help='The input file to process')
args = parser.parse_args()
process_file(args.input_file)
if __name__ == "__main__":
main()
The local file on-the-fly decompression works, but the remote code raises a zstd.ZstdError: zstd decompress error: Frame requires too much memory for decoding
exception; HOWEVER, I am certain it is not a memory issue. I set max_window_size=2**30
or 1GB and tested it on a 395K file (2.1M uncompressed) and still got the error. If I put response.content
or response.raw
or
response.raw.data
in the reader = dctx.stream_reader(response.raw)
line, I get the same error.
If I add:
response = requests.get(input_file, stream=True)
with open('test.zst', 'wb') as fh:
for chunk in response.raw:
print(".", end="")
fh.write(chunk)
print
exit()
It writes the file just fine and I can use zstd
on the command line. Adding read_across_frames=True
to the stream_reader
does nothing:
reader = dctx.stream_reader(response.raw, read_across_frames=True)
And the documentation at https://python-zstandard.readthedocs.io/en/latest/decompressor.html is confusing at best but seems to be wrong in some places since some of the example code does not work.
How do I decompress a remote zst file on the fly?
More Info Using completely ex-novo code trying to use SFTP in stead of HTTP/HTTPS, I still get the same error.
#!/usr/bin/env python3
import argparse
import io
import bz2
import gzip
import paramiko
import zstandard as zstd
from urllib.parse import urlparse
class FileReader:
def __init__(self, filename):
"""
Initialize FileReader with a filename.
:param filename: The name of the file to read.
"""
self.filename = filename
self.file = None
self.sftp = None
def __enter__(self):
"""
Open the file when entering the context.
"""
self.open()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""
Close the file when exiting the context.
"""
self.close()
def __iter__(self):
"""
Make the FileReader object iterable.
"""
return self
def __next__(self):
"""
Provide the next line in the file.
"""
line = self.file.readline()
if not line:
# End of file
raise StopIteration
else:
return line
def open(self):
"""
Open the file for reading, decompressing it on the fly if necessary.
"""
parsed = urlparse(self.filename)
if parsed.scheme == 'sftp':
# Handle SFTP URLs
client = paramiko.SSHClient()
client.load_system_host_keys()
client.connect(parsed.hostname, username=parsed.username, password=parsed.password)
sftp = client.open_sftp()
f = sftp.file(parsed.path, 'rb')
if self.filename.endswith('.zst'):
dctx = zstd.ZstdDecompressor()
self.file = io.TextIOWrapper(dctx.stream_reader(f), encoding='utf-8')
elif self.filename.endswith('.bz2'):
self.file = bz2.BZ2File(f)
elif self.filename.endswith('.gz'):
self.file = gzip.GzipFile(fileobj=f)
else:
self.file = f
self.sftp = sftp
else:
if self.filename.endswith('.zst'):
dctx = zstd.ZstdDecompressor()
self.file = io.TextIOWrapper(dctx.stream_reader(open(self.filename, 'rb')), encoding='utf-8')
elif self.filename.endswith('.bz2'):
self.file = bz2.open(self.filename, 'rt')
elif self.filename.endswith('.gz'):
self.file = gzip.open(self.filename, 'rt')
else:
self.file = open(self.filename, 'r')
def close(self):
"""
Close the file.
"""
if self.file is not None:
self.file.close()
if self.sftp is not None:
self.sftp.close()
def main():
"""
Main function, mostly for testing.
"""
parser = argparse.ArgumentParser(description='Process a zstandard compressed JSONL file.')
parser.add_argument('--input-file', '-i', type=str, required=True, help='The input file to process')
args = parser.parse_args()
with FileReader(args.input_file) as reader:
for line in reader:
print(line)
exit()
if __name__ == "__main__":
main()
Result:
> filereader.py -i sftp://sftp.example.com/path/to/file.zst
Traceback (most recent call last):
File "filereader.py", line 114, in <module>
main()
File "filereader.py", line 109, in main
for line in reader:
File "filereader.py", line 45, in __next__
line = self.file.readline()
^^^^^^^^^^^^^^^^^^^^
zstd.ZstdError: zstd decompress error: Frame requires too much memory for decoding
Works fine with .gz
, .bz2
, and uncompressed files over SFTP. It's just zstandard that's causing issues.
The code above works if you set
dctx = zstd.ZstdDecompressor(max_window_size=2**31)
^^^^^
Instead of 2**30
if you are going to try and uncompress any zstandard files compressed with --long=31.
For those who are interested: It appears that, for some (older?) versions of the zstd command-line tool, if you specify --long=31 it will make the long-distance matching up to 2GB in distance regardless of file size while some others (current?) appear to not allow matching that is >= the file size (which makes sense). So, when I was testing with a small file that was compress with an older zstd, it failed, but when I tried to recreate the problem, I could not since I was using a current version of zstd.
I may post the more complete code that handles files over SFTP/SSH compressed in zst, bz2, and gz format.